# ETL Pipeline

An ETL Pipeline is a set of processes extracting data from an input source, transforming the data, and loading it into an output destination such as a database, data mart, or a data warehouse for reporting, analysis, and data synchronization. The letters stand for Extract, Transform, and Load.

For this Pipeline, the functions below will do the following:

1. Function to connect to twitter and scrapes "Eskom_SA" tweets.
<br>
<br>
2. Cleans/Processes the tweets from the scraped tweets which will create a dataframe with two new columns using the following functions: <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a) Hashtag Remover from Analyse Functions
<br>
<br>
3. Functions which connects to your SQL database and uploads the tweets into the table you store the tweets in the database.

authors: Nthabeleng Vilakazi, Nelisiwe Mabanga

## Imports 

In [1]:
import tweepy           # To consume Twitter's API
import pandas as pd     # To handle data
import numpy as np      # For numerical computation
import json
# For plotting and visualization:
from IPython.display import display
import pyodbc

# Consumer and Access details

Fill in your Consumer and Access details you should have recieved when applying for a Twitter API.

In [2]:
# Consumer:
CONSUMER_KEY    = 'xxxxxxx' 
CONSUMER_SECRET = 'xxxxxx'

# Access:
ACCESS_TOKEN  = 'xxxxxx'
ACCESS_SECRET = 'xxxxxx'

In [3]:
# API's setup:
def twitter_setup():
    """
    Utility function to setup the Twitter's API
    with access and consumer keys from Twitter.
    """

    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    # Return API with authentication:
    api = tweepy.API(auth, timeout=1000)
    return api

## Function 1:

This function scrapes _"Eskom_SA"_ tweets from Twitter. 

Function Specifications:
- The function returns a dataframe with the scraped tweets with just the "_Tweets_" and "_Date_". 
- Will take in the ```consumer key,  consumer secret code, access token``` and ```access secret code```.

NOTE:
The dataframe should have the same column names as those in your SQL Database table where you store the tweets.

In [4]:
def twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ):
    extractor = twitter_setup()
    tweets = extractor.user_timeline(screen_name="eskom_sa", count=200, include_rts=False)
    data = pd.DataFrame(data=np.column_stack([[tweet.text for tweet in tweets],
                                            [(tweet.created_at) for tweet in tweets]]),
                                            columns=['Tweets','Date'])
    return data

## Function 2: Removing hashtags and the municipalities

Write a function which extracts the hashtags and municipalities into it's own column in a new dataframe

Function Specifications:
- The function should take in the pandas dataframe created in Function 1 and return a new pandas dataframe.

In [5]:
### START FUNCTION
def extract_municipality_hashtags(df):
    # your code here
    mun_dict = {
                '@CityofCTAlerts' : 'Cape Town',
                '@CityPowerJhb' : 'Johannesburg',
                '@eThekwiniM' : 'eThekwini' ,
                '@EMMInfo' : 'Ekurhuleni',
                '@centlecutility' : 'Mangaung',
                '@NMBmunicipality' : 'Nelson Mandela Bay',
                '@CityTshwane' : 'Tshwane'}
    cities = []
    mun = 0
    count = 1
    Tweets = df['Tweets']
    for x in Tweets:
        for keys in mun_dict.keys():
            if keys in x:
                cities.append(mun_dict[keys])
                mun += 1
        if mun == count:
            count += 1
        else:
            cities.append(np.nan)    
    df['municipality'] = cities
    
    hash_tags = []
    for x in Tweets:
        strsplit = []
        innerlist = []
        mun = 0
        strsplit = x.split()
        for var in strsplit:
            if var[0] == '#':
                innerlist.append(var.lower())
                mun += 1
        if mun != 0:
            hash_tags.append(innerlist)
        else:
            hash_tags.append(np.nan)
    df['hashtags'] = hash_tags
    return df
### END FUNCTION

## Function 3: Updating SQL Database with pyODBC

- This function connects and updates your SQL database. 

Function Specifications:
- The function should take in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [6]:
def pyodbc_twitter(connection, df, twitter_table):
    
    cur.execute(
    """
    DROP TABLE IF EXISTS twitter_table;
    CREATE TABLE twitter_table ([Tweets] VARCHAR(300), [Date] VARCHAR(50), [municipality] VARCHAR(20), [hashtags] VARCHAR(100)); 
     """
        
    )
    
#     df2 =extract_municipality_hashtags(twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ))
    
    for index,row in df.iterrows():
        cur.execute("""INSERT INTO dbo.twitter_table([Tweets],[Date],[municipality], [hashtags]) values(?,?,?,?)""",
        str(row['Tweets']), str(row['Date']), str(row['municipality']), str(row['hashtags'])
        )
        
    cur.close()
    connection.close()


    return None 