## Complete Pipeline from Hydrated JSONL to the full state DF of multiple dates

- !pip install dask
- !pip install geopy
- !pip install vaderSentiment

In [7]:
JSONL_done = ["C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200320.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200325.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200327.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200405.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200501.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200517.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200608.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200701.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200907.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200921.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200930.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20210301.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20210116.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20210323.jsonl",
               "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20210401.jsonl"]

JSON_willDo = []
# 1,2: 210 | 3: 74 | 4: 180 | 5: 30 | 6: 120 | 7: 45 | 8: 45 | 9: 28 | 10:18 | 11: 16 | 12: 160 | 

# Dask Error
# "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200330.jsonl"
# "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200414.jsonl"
# "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200801.jsonl"
# "C:\Data Science\Jupyter_Workspace\Twitter_Sentiment\Data\JSONLs\\20200826.jsonl"

CSVpath = "C:\Data Science\GitHub Projects\Twitter-Sentiment-Analysis\\final_df.csv"

In [8]:
def sentiment_pipeline(JSONL_list, CSVpath):
    #importing libraries
    from geopy.geocoders import Nominatim
    from geopy.extra.rate_limiter import RateLimiter
    import dask.bag as db
    import json
    import pandas as pd
    import numpy as np
    from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
    import re

    final_df = pd.read_csv(CSVpath)
    #final_df = pd.DataFrame()
    
    for hydratedJSONL in JSONL_list:    
        #STEP ONE: Extract Indian tweets with location
        # 1.loading the big JSONL into DASK for parallelization
        db_dask = db.read_text(hydratedJSONL).map(json.loads)
        ind_dask = db_dask.filter(lambda tweet: tweet['place'] is not None and tweet['place']['country'] == 'India')

        #function to only convert relevant data into Dask DF
        def flatten(tweet):
            return {
            'id': tweet['id_str'],
            'longitude': tweet['place']['bounding_box']['coordinates'][0][0][0],
            'latitude': tweet['place']['bounding_box']['coordinates'][0][0][1],
            'text': tweet['full_text']
            }
            
        ind_dask_df = ind_dask.map(flatten).to_dataframe()
        ind_df = ind_dask_df.compute() #converting Dask DF to Pandas DF

        # 2.extracting State through coordinates
        def getState(longitude, latitude):
            from random import randint
            user_agent = 'user_me_{}'.format(randint(10000,99999))
            geolocator = Nominatim(user_agent=user_agent)
            reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)
            location = reverse((str(latitude) + "," + str(longitude)), language='en', exactly_one=True)
            if location is not None:
                if 'state' in location.raw['address']:
                    state = location.raw['address']['state']
                    return state
                else:
                    return None
            else:
                return None
            
        ind_df["state"] = ind_df.apply(lambda x : getState(x["longitude"], x["latitude"]), axis=1)

        # 3.removing rows with null State values
        df_stepOne = ind_df[~ind_df['state'].isnull()]


        # --------------------------------------------------------
        #STEP TWO: Getting Sentiment Scores of Tweet texts with VADER
        # 1.preprocessing the tweet text
        def preprocess_for_vader(tweet):
            import re
            cleaned_text = tweet
            cleaned_text = re.sub(r'RT @[\w]*:', '', cleaned_text) #removing RT handles
            cleaned_text = re.sub(r'@[\w]*', '', cleaned_text) #removing all @mentions
            cleaned_text = re.sub(r'((www.[^s]+)|(https?://[^s]+))', ' ', cleaned_text) #removing URL links
            cleaned_text = np.core.defchararray.replace(cleaned_text, "[^a-zA-Z#]", " ") #removing special characters, numbers and punctuations except #
            return cleaned_text

        df_stepOne["vader_text"] = df_stepOne["text"].apply(lambda x: preprocess_for_vader(x))

        # 2.sentiment scoring with VADER
        analyser = SentimentIntensityAnalyzer()
        df_stepOne["vader_score"] = df_stepOne["vader_text"].apply(lambda x: analyser.polarity_scores(x)["compound"])

        # 3.storing in new df
        df_stepTwo = df_stepOne.copy()


        # --------------------------------------------------------    
        #STEP THREE: Getting average State Score
        # 1.grouping to get the score for each state
        df_stepThree = df_stepTwo.copy()
        df_stepThree = df_stepThree.groupby('state', as_index=False)['vader_score'].mean()

        # 2.correcting state names to match with GeoJSON Keys
        df_stepThree["state"].replace({"Jammu and Kashmir": "Jammu & Kashmir", 
                                "Dadra and Nagar Haveli and Daman and Diu": "Dadara & Nagar Havelli", 
                                "Arunachal Pradesh": "Arunanchal Pradesh", 
                                "Delhi": "NCT of Delhi"}, inplace=True)

        # 3.removing rows for invalid states
        state_list = ['Andaman & Nicobar Island', 'Andhra Pradesh', 'Arunanchal Pradesh', 'Assam', 'Bihar', 'Chandigarh', 'Chhattisgarh',
                    'Dadara & Nagar Havelli', 'Daman & Diu', 'Goa', 'Gujarat', 'Haryana', 'Himachal Pradesh', 'Jammu & Kashmir',
                    'Jharkhand', 'Karnataka', 'Kerala', 'Lakshadweep', 'Madhya Pradesh', 'Maharashtra', 'Manipur', 'Meghalaya', 'Mizoram',
                    'NCT of Delhi', 'Nagaland', 'Odisha', 'Puducherry', 'Punjab', 'Rajasthan', 'Sikkim', 'Tamil Nadu', 'Telangana', 'Tripura',
                    'Uttar Pradesh', 'Uttarakhand', 'West Bengal']
        df_stepThree.drop(df_stepThree[~df_stepThree["state"].isin(state_list)].index, inplace=True)


        # --------------------------------------------------------    
        #STEP FOUR: Appending date column and appending each day's day to the final DF
        # 1.adding the date column
        df_stepFour = df_stepThree.copy()
        res = re.findall("(\d+).jsonl", hydratedJSONL)
        date = (int)(res[0])
        df_stepFour["date"] = date
        
        # 2.appending individual DFs
        final_df = final_df.append(df_stepFour, ignore_index = True)

    #----------------------------------------------------------------
    #exporting DF to CSV
    final_df.to_csv(CSVpath, index=False)

In [9]:
sentiment_pipeline(JSON_willDo, CSVpath)

RateLimiter caught an error, retrying (0/2 tries). Called with (*('28.443981,77.302126',), **{'language': 'en', 'exactly_one': True}).
Traceback (most recent call last):
  File "C:\Users\KIIT\miniconda3\lib\site-packages\urllib3\connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "C:\Users\KIIT\miniconda3\lib\site-packages\urllib3\connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "C:\Users\KIIT\miniconda3\lib\http\client.py", line 1344, in getresponse
    response.begin()
  File "C:\Users\KIIT\miniconda3\lib\http\client.py", line 306, in begin
    version, status, reason = self._read_status()
  File "C:\Users\KIIT\miniconda3\lib\http\client.py", line 267, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "C:\Users\KIIT\miniconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
  File "C:\Users\KIIT\miniconda3\lib