# Research Questions

**1. Think about the following two situations:
a. Analyze the data of recent power usage statistics reported to a power station and
adjust the power generate rate if necessary.
b. A school needs to know data about their students, and query about courses, age,
and instructors.
Which system would you choose for each situation? If you choose DSMS, explain in
detail how you would implement it**

a. For the first situation choosing a DSMS is the best option. DSMS has the possibility to handle potentially infinite and rapidly changing data streams by offering flexible processing at the same time, although there are only limited resources such as main memory. DSMS offers a flexible query processing so that the information needed can be expressed using queries. DSMS executes a continuous query that is not only performed once, but is permanently installed. Therefore, the query is continuously executed until it is explicitly uninstalled. In this way we can adjust the power generate rate if necessary.

b. For the second situation the best option is to choose a DBMS because we don't need to process that data constantly in order to make changes to a system (what the queries can do in DSMS constantly running). We need a software system that enables users to define, create, maintain and control access to the database, which in this case is a student database.

**2. Explain three of the most typical strategies for CQ processing, and for each of them,
think of scenarios where it would be more convenient to apply it.**


If the input rate of each stream in DSMS is trackable, which amounts to knowing how many tuples will be arriving in the future time slots we can find an optimal scheduling strategy which can give the best performance by using minimized resources. Most of the times the input rate of data stream is unpredictable and higly bursty which makes it difficult to find a feasable, optimal scheduling strategy. 

**FIFO** (First Imput First Output):Tuples are processed in the order of their arrival. Once a tuple is scheduled, it is processed by the operators along its  OP (operator path) until it's consumed by an intermediate operator or output to the root node. Then the next oldest tuple is scheduled.

**Chain Strategy**: Near optimal scheduling strategy in terms of total internal queue size. At any time, consider all tuples that are currently on the system; of these, schedule a single time unit for the tuple that lies on the segment with the steepest slope in its lowest envelope simulation. If there are multiple such tuples, select the tuple which has the earliest arrival time.

The chain strategy performs better than FIFO for the total internal queue size, but it performs worse than FIFO in terms or tuple latency. The FIFO strategy does not consider inherent properties of an operator such as selectivity and processing rate. The Chain Strategy pushes the tuples from the bottom, it inherits the bursty property of the input stream. If the input streams are bursty in nature, the output of the query processing system under the chain strategy is too.  


**Greedy**: The query optimizer performs an exhaustive search of all possible plans for executing a query. It computes a cost estimate for each possible plan and chooses the cheapest plan according to the cost algorithms.
For queries involving large numbers of tables, the time spent enumerating the plans and associating costs with them can be unnaceptable. The query optimizer includes an alternative mechanism for generating query plans, called greedy optimization, that can greatly reduce the length of compile time. Rather than enumerate all possible query plans (including all permutations of table join order, all combinations of tables and useful secondary indexes, and all “shapes” of query plans), the “greedy” enumeration heuristic starts with small plan fragments, using them as building blocks to construct the eventual query plan. The chosen fragments are always the lowest cost at that stage of plan construction, so even though large numbers of potential plans are not considered, those that are chosen are also based on cost estimation techniques.

**3. Mention the main characteristics of a Query Scheduler in a Data Stream Management
System. What do you think is the biggest problem of the scheduling in comparison with a
Data Base Management System?**

Characteristics:

1-The scheduler needs to provide rate synchronization within different operators.

2-Time-varying arrival rates of data streams and timevarying output rates of operators.

There are continuous queries in a DSMS
Scheduling decision take into account:
● memory allocation across operators
● management of buffers for incoming stream
● performance requirements of individual queries

All this is not neccessary in DBMS because it has theorical unlimited memmory.

# Development

**1. Write queries in CQL (objective is to understand the queries, but the script isn’t in SQL)**

*Column  family  database,  which  will  have  single  tweet stored  per  row  and  the  columns  contain  information regarding each  tweet e.g. time of  the tweet,  name of  the user publishing the tweet, text in the tweet and an id which uniquely identifies the tweet*

CREATE  TABLE  tweets  (tweetid text,  tweet-  time  text, tweet text, postedby text, PRIMARY KEY(tweetid)); 

*This  CQL  command  will  read  the  timeline  of  user1.*

SELECT * FROM user1timeline; 

*The following CQL command creates the column family for user1.*

CREATE TABLE user1timeline (tweetnode int, tweettime text, tweetid text, PRIMARY KEY (tweetnode, tweettime)) WITH  CLUSTERING  ORDER  BY  (tweettime  DESC, tweetid ASC); 

*We  run  the  following  CQL commands  to carry  out  the write operation:*

INSERT  INTO  tweets  (tweetid,  tweettime,  tweet,   postedby)  VALUES  (‘t1’,    ‘23:45’,    ‘Hello  World!!!’, ‘user1’);    INSERT  INTO  user2timeline  (tweetnode,  tweet-  time, tweetid) VALUES (201708, ‘23:45’,‘t1’);    INSERT  INTO  user3timeline  (tweetnode,  tweet-  time, tweetid) VALUES (201708, ‘23:45’, ‘t1’);

*Read operation*

SELECT * FROM user2timeline;   SELECT * FROM user3timeline; 

**2.Implement a query processing script in python over a Tweeter Data Stream**

Here I implement query processing over a Tweeter Data Stream to count the percentage of tweets with the word 'love' over 2 minutes

In [1]:
from __future__ import absolute_import, print_function
import tweepy
from tweepy import OAuthHandler, Stream, StreamListener
import json
import time

In [2]:
consumer_key = 'cant put this here sorry'
consumer_secret = 'cant put this here sorry'

access_token='cant put this here sorry'
access_token_secret='cant put this here sorry'

In [7]:
class StdOutListener(StreamListener):
    """ A listener handles tweets that are received from the stream.
    This is a basic listener that just prints received tweets to stdout.
    """
    def __init__(self):
        self.t0 = time.time()#initialize time
        self.cnt = 0#counter for tweets
        self.cnt_love = 0
               
        
    def on_data(self, data):
        data = json.loads(data)
        try:
            if (data['truncated']):#if this is true, this is an extended tweet
                if 'love' in data['extended_tweet']['full_text']:
                    self.cnt_love += 1
                self.cnt += 1
            else:
                if 'love' in data['text']:
                    self.cnt_love += 1
                self.cnt += 1
               # print(data['text'])
            
        except: 
            return True
        if (time.time()-self.t0>120):# 2 minutes, 120 seconds
            prc = (self.cnt_love/self.cnt)*100
            print(prc)
            return False
        else:
            return True

    def on_error(self, status):
        print(status)
        

l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream = Stream(auth, l)
stream.filter(track=['the'])#we filter the tweets that contain 'the'. From those, we give the percentage of tweets that contain 'love'

1.8278497839813894


**3. Give the answer of the queries below.**

**● percentage of tweets mentioning “Yes” during 2 minutes, from all tweets, without
restrictions.**

In [6]:
class StdOutListener(StreamListener):
    """ A listener handles tweets that are received from the stream.
    This is a basic listener that just prints received tweets to stdout.
    """
    def __init__(self):
        self.t0 = time.time()#initialize time
        self.cnt = 0#counter for tweets
        self.cnt_yes = 0
               
        
    def on_data(self, data):
        data = json.loads(data)
        try:
            if (data['truncated']):#if this is true, this is an extended tweet
                if 'Yes' in data['extended_tweet']['full_text']:
                    self.cnt_yes += 1
                self.cnt += 1
            else:
                if 'Yes' in data['text']:
                    self.cnt_yes += 1
                self.cnt += 1
               # print(data['text'])
            
        except: 
            return True
        if (time.time()-self.t0>120):# 2 minutes, 120 seconds
            prc = (self.cnt_yes/self.cnt)*100
            print(prc)
            return False
        else:
            return True

    def on_error(self, status):
        print(status)
        

l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream = Stream(auth, l)
stream.filter(track=['the'])#we filter the tweets that contain 'the'. From those, we give the percentage of tweets that contain 'Yes'

0.4165972337943676


**● (sliding windows) Use sliding windows, and calculate the percentage of tweets that
mention “Yes”. Use a window size of 300 tweets.**

In [5]:
class StdOutListener(StreamListener):
    """ A listener handles tweets that are received from the stream.
    This is a basic listener that just prints received tweets to stdout.
    """
    def __init__(self):
        self.t0 = time.time()#initialize time
        self.cnt = 0#counter for tweets
        self.cnt_yes = 0#counter for tweets with the word 'yes'
        self.list_300 = list()#sliding window of 300 elements
               
        
    def on_data(self, data):
            if (len(self.list_300)<300):
                data = json.loads(data)
                try:
                    if (data['truncated']):#if this is true, this is an extended tweet
                        self.list_300.append(data['extended_tweet']['full_text'])#append 1 element
                    else:
                        self.list_300.append(data['text'])#append 1 element
                except: 
                    return True
            elif (len(self.list_300) >= 300):#if we reach the lenght of 300, we count the word
                for i in self.list_300:#check each element of the window
                        if 'Yes' in i:#check in each element (tweet) for the word 'Yes'
                            self.cnt_yes += 1
                           # print(data['text'])
                        self.cnt += 1

                self.list_300.pop(0)#take out the oldest element of the list
                data = json.loads(data)
                try:
                    if (data['truncated']):
                        self.list_300.append(data['extended_tweet']['full_text'])#append 1 element
                    else:
                        self.list_300.append(data['text'])
                except:
                    return True
            if (time.time()-self.t0>120):# at 2 minutes, 120 seconds, we print the percentage
                prc = (self.cnt_yes/self.cnt)*100
                print(prc)
                return False
            else:
                return True

    def on_error(self, status):
        print(status)
        

l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream = Stream(auth, l)
stream.filter(track=['the'])#we filter the tweets that contain 'the'. From those, we give the percentage of tweets that contain 'Yes'

0.4445418932351049


**● (batching) percentage of tweets mentioning “No” during 2 minutes. Assume that the
counting function has a very slow rate, use buffers (length 100) for the elements and
compute the query answer using each tweet once.**

In [20]:
class StdOutListener(StreamListener):
    """ A listener handles tweets that are received from the stream.
    This is a basic listener that just prints received tweets to stdout.
    """
    def __init__(self):
        self.t0 = time.time()#initialize time
        self.cnt = 0#counter for tweets
        self.cnt_no = 0#counter for tweets with the word 'no'
        self.list_100 = list()#batch of 100 elements
               
        
    def on_data(self, data):
            if (len(self.list_100)<100):
                data = json.loads(data)
                try:
                    if (data['truncated']):#if this is true, this is an extended tweet
                        self.list_100.append(data['extended_tweet']['full_text'])#append 1 element
                    else:
                        self.list_100.append(data['text'])#append 1 element
                except: 
                    return True
            elif (len(self.list_100) >= 100):#if we reach the lenght of 100, we count the word
                for i in self.list_100:#check each element of the window
                        if 'No' in i:#check in each element (tweet) for the word 'Yes'
                            self.cnt_no += 1
                        self.cnt += 1

                self.list_100 = list() #clear buffer
            if (time.time()-self.t0>120):# at 2 minutes, 120 seconds, we print the percentage
                prc = (self.cnt_no/self.cnt)*100
                print(prc)
                return False
            else:
                return True

    def on_error(self, status):
        print(status)
        

l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream = Stream(auth, l)
stream.filter(track=['the'])#we filter the tweets that contain 'the'. From those, we give the percentage of tweets that contain 'No'

3.9833333333333334


**● (sampling) percentage of tweets mentioning “Hi” during 2 minutes. Assume that the
update function is slow. Update the list with a sample of the elements. (e.g., one in one
hundred).**

In [22]:
class StdOutListener(StreamListener):
    """ A listener handles tweets that are received from the stream.
    This is a basic listener that just prints received tweets to stdout.
    """
    def __init__(self):
        self.t0 = time.time()#initialize time
        self.cnt_s = 0 #counter for sampling
        self.cnt = 0 #counter for tweets
        self.cnt_hi = 0 #counter for tweets with the word 'Hi'
               
        
    def on_data(self, data):
        data = json.loads(data)
        self.cnt_s += 1
        if (self.cnt_s>=50):#when the counter reaches 50, we analyse the tweet (we sample every 50 tweets)
            try:
                    if 'Hi' in data['extended_tweet']['full_text']:
                if (data['truncated']):#if this is true, this is an extended tweet
                        self.cnt_hi += 1
                    self.cnt += 1
                else:
                    if 'Hi' in data['text']:
                        self.cnt_hi += 1
                    self.cnt += 1
                   # print(data['text'])

            except: 
                return True
            self.cnt_s = 0
        if (time.time()-self.t0>120):# 2 minutes, 120 seconds
            prc = (self.cnt_hi/self.cnt)*100
            print(prc)
            return False
        else:
            return True

    def on_error(self, status):
        print(status)
        

l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream = Stream(auth, l)
stream.filter(track=['the'])##we filter the tweets that contain 'the'. From those, we give the percentage of tweets that contain 'Hi'

0.9433962264150944


# Conclusions

There are different ways to deal with a data stream query in case one of the operations of update and compute answer work more slow than the other. Batch processing helps when the update operation is fast but the compute answer operator is slow, and sampling works the other way around.

Selecting recent data for processing queries makes the data analysis better, because it excludes old data that might be irrelevant. However, in my implementation above, the window had a size of 300 but every time the window is 'slided' we compute the search of the word 'Yes' over the previous 299 tweets that were analyzed before. I think this increasing computing time.

There are plenty of possibilities for data analysis, and possibly useful information that can be extracted from twitter using a developer account and tweepy.