# Building an ETL Pipeline

As the second part of the predict for Gather, you will need to build a pipeline of functions in python which does the following:

1. Function to connect to twitter and scrapes "Eskom_SA" tweets.
<br>
<br>
2. Cleans/Processes the tweets from the scraped tweets which will create a dataframe with two new columns using the following functions: <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a) Hashtag Remover from Analyse Functions
<br>
<br>
3. Functions which connects to your SQL database and uploads the tweets into the table you store the tweets in the database.

In [3]:
# General:
import tweepy           # To consume Twitter's API
import pandas as pd     # To handle data
import numpy as np      # For numerical computation
import json             # For loading credentials
import pyodbc           # For processing SQL queries


# Consumer and Access details

Fill in your Consumer and Access details you should have recieved when applying for a Twitter API. 

In [4]:
# Opening JSON file containing secret API access and consumer details
path_to_json = "./tokens.json"
with open(path_to_json, 'r') as handler:info = json.load(handler)

# Consumer:
CONSUMER_KEY    = info['CONSUMER_KEY']
CONSUMER_SECRET = info['CONSUMER_SECRET']

# Access:
ACCESS_TOKEN  = info['ACCESS_TOKEN']
ACCESS_SECRET = info['ACCESS_SECRET']

In [5]:
# API's setup:
def twitter_setup():
    """
    Utility function to setup the Twitter's API
    with access and consumer keys from Twitter.
    """

    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    # Return API with authentication:
    api = tweepy.API(auth, timeout=1000)
    return api

# Function 1:

Write a function which:
- Scrapes _"Eskom_SA"_ tweets from Twitter. 

Function Specifications:
- The function should return a dataframe with the scraped tweets with just the "_Tweets_" and "_Date_". 
- Will take in the ```consumer key,  consumer secret code, access token``` and ```access secret code```.

NOTE:
The dataframe should have the same column names as those in your SQL Database table where you store the tweets.

In [6]:
def twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ):
    """A utility function that connects to the Twitter API and exctracts tweets.
    
    Parameters
    ----------
    The function takes in the private consumer access details providede by twitter when
    applying for an API key. These are used to establish a connnection with the Twitter API to enable a scraping 
    of tweets from 'Eskom_SA'
    
    Returns
    -------
    data: Pandas dataframe
        A dataframe of scraped tweets containing the columns: Date, Tweet 
    """
    # Code Here
    global data
    extractor = twitter_setup()
    tweets = extractor.user_timeline(screen_name='Eskom_SA', count=200, include_rts=False)

    data = pd.DataFrame(data=np.column_stack([[tweet.text for tweet in tweets],
                                          [(tweet.created_at) for tweet in tweets]]),
                                          columns=['Tweets','Date'])
    return data

# Function 2: Removing hashtags and the municipalities

Write a function which:
- Uses the function you wrote in the Analyse section to extract the hashtags and municipalities into it's own column in a new data frame. 

Function Specifications:
- The function should take in the pandas dataframe you created in Function 1 and return a new pandas dataframe. 

In [7]:
twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET )

Unnamed: 0,Tweets,Date
0,Only Eskom trained &amp; authorized personnel ...,2020-03-11 10:33:00
1,#POWERALERT 1\n\nDate: 11 March 2020\n\nStage ...,2020-03-11 09:58:19
2,We are currently loadshedding in stage 4. \n\...,2020-03-11 07:42:17
3,"Before you leave for work/school, take food ou...",2020-03-11 04:00:00
4,Eskom is currently loadshedding in stage 2\n\n...,2020-03-11 03:39:22
...,...,...
177,@Aut771 Eskom is currently not loadshedding. P...,2020-02-24 08:47:27
178,@Makimofokeng4P Eskom is currently not loadshe...,2020-02-24 08:46:34
179,@Gigi_Mhayise #ICYMI We have been sharing how ...,2020-02-24 08:35:37
180,@Mrs_Hashtag Eskom is currently not loadsheddi...,2020-02-24 08:19:56


In [8]:
def extract_municipality_hashtags(df):
    """Extracts the municipalties and hashtags from tweets. 
    The function returns these in a Pandas dataframe.
    
    Parameters
    ----------
    df: Pandas dataframe
        A pandas dataframe containing two columns: Date, Tweets
    
    Returns
    -------
    df_working: Pandas dataframe
        The result result of applying the below function to extract hashtags and municipalities from tweets.
        The resulting frame contains for columns: Date, Tweet, Municipality and Hashtag
    """
    ### Code Here
    mun_dict = {'@CityofCTAlerts' : 'Cape Town',
                '@CityPowerJhb' : 'Johannesburg',
                '@eThekwiniM' : 'eThekwini' ,
                '@EMMInfo' : 'Ekurhuleni',
                '@centlecutility' : 'Mangaung',
                '@NMBmunicipality' : 'Nelson Mandela Bay',
                '@CityTshwane' : 'Tshwane'}
    global df_working
    df_working = df.copy(deep = True)
    finder_keys = list(mun_dict.keys())
    finder_values = list(mun_dict.values())
    finder = '|'.join(finder_keys +finder_values)
    df_working.insert(2, 'municipality', df.Tweets.str.contains(finder).replace((True,False),('', np.nan)))
    df_working.insert(3, 'hashtags', df.Tweets.str.contains('#').replace((True, False),('', np.nan)))
    finder_municipality = df_working[df.Tweets.str.contains(finder) == True].index.to_list()
    finder_tags = df_working[df.Tweets.str.contains('#') == True].index.to_list()
    
    for index in finder_tags:
        tempy = []
        magic = df_working.at[index, 'Tweets'].split()
        for items in magic:
            if items[0] == '#':
                tempy.append(items)
                df_working.at[index, 'hashtags'] = tempy
            elif '#' in items:
                stop=1
                while stop !=-1:
                    if items[stop] == '#':
                        tempy.append(items[stop:])
                        df_working.at[index, 'hashtags'] = tempy
                        stop =-1
                    else: stop+=1
    for index in finder_municipality:
        magic = df_working.at[index, 'Tweets'].split()
        for items in magic:
            if items in finder_keys:
                df_working.at[index, 'municipality'] = mun_dict[items]
            elif items in finder_values:
                df_working.at[index, 'municipality'] = items
            elif items[-1]  == ':' :
                finder_2 = items[:-1]
                if finder_2 in finder_keys:
                    df_working.at[index, 'municipality'] = mun_dict[finder_2]
                elif finder_2 in finder_values:
                    df_working.at[index, 'municipality'] = finder_2
                
            
    
    return df_working

In [9]:
extract_municipality_hashtags(data).head(50)

Unnamed: 0,Tweets,Date,municipality,hashtags
0,Only Eskom trained &amp; authorized personnel ...,2020-03-11 10:33:00,,
1,#POWERALERT 1\n\nDate: 11 March 2020\n\nStage ...,2020-03-11 09:58:19,,[#POWERALERT]
2,We are currently loadshedding in stage 4. \n\...,2020-03-11 07:42:17,,
3,"Before you leave for work/school, take food ou...",2020-03-11 04:00:00,,
4,Eskom is currently loadshedding in stage 2\n\n...,2020-03-11 03:39:22,,
5,#POWERALERT 3\nDate: 10 March 2020\n\nStage 4 ...,2020-03-10 16:42:19,,[#POWERALERT]
6,#InTheNews #Loadshedding Find your load sheddi...,2020-03-10 16:01:00,,"[#InTheNews, #Loadshedding]"
7,#LoadShedding: Tips To Make Sure Your Home Sec...,2020-03-10 14:30:00,,[#LoadShedding:]
8,Report electricity theft anonymously to Eskom ...,2020-03-10 12:38:00,,
9,#POWERALERT 2\n\nDate: 10 March 2020\n\nStage ...,2020-03-10 11:50:38,,[#POWERALERT]


# Function 3: Updating SQL Database with pyODBC

Write a function which:
- Connects and updates your SQL database. 

Function Specifications:
- The function should take in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [11]:
# Defining databse variables
server = 'EDSA-3H9PSM2\SQLEXPRESS'
database = 'gather_eskom'
twitter_table = 'TWEETS'

# Defining connection string for pyodbc
connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server}; \
                        SERVER=' + server + ';\
                        DATABASE=' + database +';\
                        Trusted_Connection=yes;')

In [29]:
def pyodbc_twitter(connection, df, twitter_table):
    """Extracts the municipalties and hashtags from tweets. 
    The function returns these in a Pandas dataframe.
    
    Parameters
    ----------
    connection: String
                A PYODBC connection string used to connect to the SQL database.
                The string makes use of the 'server' and 'database' global variables
    
    df: Pandas dataframe
        A pandas dataframe containing four columns: Date, Tweets, Municipality, Hashtag
    
    twitter_table:  String
                    A string containing the name of the Twitter table in the SQL database to which
                    Tweets will be stored
    Returns
    -------
    An updated version of the 'twitter_table' in the SQL databse containing: Date, Tweets, Municipality, Hashtags
    """
    ### Code Here
    cursor = connection.cursor()
    insert_query = '''INSERT INTO '''+twitter_table+'''(Date, Tweet, Municipality, Hashtag)
                    VALUES (?, ?, ?, ?);'''
    
    
    cursor.execute('''CREATE TABLE TweetTemp(Date varchar(50),
                    Tweet varchar(280), Municipality varchar(50), Hashtag varchar(140))''')
    temp_ins = '''INSERT INTO TweetTemp(Date, Tweet, Municipality, Hashtag)
                  VALUES (?, ?, ?, ?);'''
    
    for index, row in df.iterrows():
        date = row['Date']
        tweet = row['Tweets']
        if row['municipality'] is np.nan:
            muni = None
        elif row['municipality'] is '':
            muni = None
        elif row['municipality'] is not np.nan:
            muni = row['municipality']
        if row['hashtags'] is np.nan:
            hashy = None
        elif row['hashtags'] is not np.nan:
            hashy = ', '.join(row['hashtags'])
        cursor.execute(temp_ins, date,tweet,muni,hashy)
     
    cursor.execute('''INSERT INTO '''+twitter_table+'''(Date, Tweet, Municipality, Hashtags)
                    SELECT * FROM TweetTemp WHERE tweet not in (SELECT tweet FROM '''+twitter_table+''')''')
    cursor.execute('''DROP TABLE TweetTemp''')
    connection.commit()
    return None

In [30]:
pyodbc_twitter(connection, df_working, twitter_table)