### Assignment 2 - Week 3
### Create a pyspark script which will read data from particular table from Mysql/Postgres database
### Perform some operations/transformation on the data and save the result to local file storage

In [1]:
# Note - Launched pyspark with below command
# pyspark --driver-class-path .\postgresql-42.2.18.jar --jars .\postgresql-42.2.18.jar

from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.extraClassPath', "postgresql-42.2.18.jar").getOrCreate()
url = 'jdbc:postgresql://127.0.0.1/postgres'
properties = {'user': 'postgres', 'password': 'n0ob007'}
emp_df = spark.read.jdbc(url=url, table='Emp', properties=properties)

In [8]:
emp_df.show(100)

+-------+-----------------+-----------+
|empname|   empdesignation|emplocation|
+-------+-----------------+-----------+
|   John|Software Engineer| California|
|   Wade|    Data Engineer|    Toronto|
| Vishal|    Data Engineer|     Mumbai|
|   Adam|    Test Engineer|   New York|
+-------+-----------------+-----------+



### Converting Dataframe into RDD

In [3]:
emp_df.rdd.collect()

[Row(empname='John', empdesignation='Software Engineer', emplocation='California'),
 Row(empname='Wade', empdesignation='Data Engineer', emplocation='Toronto'),
 Row(empname='Vishal', empdesignation='Data Engineer', emplocation='Mumbai'),
 Row(empname='Adam', empdesignation='Test Engineer', emplocation='New York')]

In [4]:
dd = sc.parallelize(emp_df.rdd.map(lambda row: row.asDict()).collect())

In [5]:
dd.collect()

[{'empname': 'John',
  'empdesignation': 'Software Engineer',
  'emplocation': 'California'},
 {'empname': 'Wade',
  'empdesignation': 'Data Engineer',
  'emplocation': 'Toronto'},
 {'empname': 'Vishal',
  'empdesignation': 'Data Engineer',
  'emplocation': 'Mumbai'},
 {'empname': 'Adam',
  'empdesignation': 'Test Engineer',
  'emplocation': 'New York'}]

In [6]:
dd.collect()[0]['empdesignation']

'Software Engineer'

### Another way to convert Dataframe into RDD and then convert RDD into list of dict

In [7]:
emp_df.rdd.map(lambda row: row.asDict()).collect()

[{'empname': 'John',
  'empdesignation': 'Software Engineer',
  'emplocation': 'California'},
 {'empname': 'Wade',
  'empdesignation': 'Data Engineer',
  'emplocation': 'Toronto'},
 {'empname': 'Vishal',
  'empdesignation': 'Data Engineer',
  'emplocation': 'Mumbai'},
 {'empname': 'Adam',
  'empdesignation': 'Test Engineer',
  'emplocation': 'New York'}]

### Adding new column and transforming column

In [9]:
import pyspark.sql.functions as F
# new_emp_df = emp_df.withColumnRenamed('emplocation','emp_location')
# new_emp_df = new_emp_df.withColumnRenamed('empdesignation','emp_designation')
# new_emp_df = new_emp_df.withColumnRenamed('empname','emp_name')
new_emp_df = emp_df.select(
    F.col('emplocation').alias('emp_location'),F.col('empdesignation').alias('emp_designation'),F.col('empname').alias('emp_name')
)

In [10]:
new_emp_df.show()

+------------+-----------------+--------+
|emp_location|  emp_designation|emp_name|
+------------+-----------------+--------+
|  California|Software Engineer|    John|
|     Toronto|    Data Engineer|    Wade|
|      Mumbai|    Data Engineer|  Vishal|
|    New York|    Test Engineer|    Adam|
+------------+-----------------+--------+



In [11]:
def extract_initials(col):
    splitted_data = str(col).strip().split()
    return ''.join([str(i[0]).upper() for i in splitted_data])

In [12]:
extract_initials("Data Engineer")

'DE'

In [13]:
import pyspark.sql.functions as sqlFunc
import pyspark.sql.types as sqlTypes
usr_def_func = sqlFunc.UserDefinedFunction(extract_initials,sqlTypes.StringType())
transformed_emp_df = new_emp_df.withColumn('emp_desig_sf',usr_def_func('emp_designation'))
transformed_emp_df.show(100)

+------------+-----------------+--------+------------+
|emp_location|  emp_designation|emp_name|emp_desig_sf|
+------------+-----------------+--------+------------+
|  California|Software Engineer|    John|          SE|
|     Toronto|    Data Engineer|    Wade|          DE|
|      Mumbai|    Data Engineer|  Vishal|          DE|
|    New York|    Test Engineer|    Adam|          TE|
+------------+-----------------+--------+------------+



In [15]:
transformed_emp_df.toPandas().to_csv("Transformed_emp_df.csv",header=True,index= False)

### Assignment 3 - Week 3
### MoviLens dataset
### Dataset url:https://grouplens.org/datasets/movielens/
### Use PySpark for the followings
### Find the Most Popular Movie

In [16]:
import pandas as pd
links_df = pd.read_csv("./ml-latest-small/links.csv")
movies_df = pd.read_csv("./ml-latest-small/movies.csv")
ratings_df = pd.read_csv("./ml-latest-small/ratings.csv")
tags_df = pd.read_csv("./ml-latest-small/tags.csv")

In [17]:
links_df.head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0


In [18]:
movies_df.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [19]:
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [20]:
tags_df.head()

Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992
3,2,89774,Boxing story,1445715207
4,2,89774,MMA,1445715200


### Top 25 most Rated movies

In [21]:
# Merging 2 dataframe (movies and ratings)
movies_ratings = pd.merge(movies_df,ratings_df)

In [22]:
movies_ratings.head()

Unnamed: 0,movieId,title,genres,userId,rating,timestamp
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1,4.0,964982703
1,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,5,4.0,847434962
2,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,7,4.5,1106635946
3,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,15,2.5,1510577970
4,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,17,4.5,1305696483


### PANDAS : First way to find most rated movies by using Groupby

In [24]:
movies_ratings.groupby('title').size().to_frame().rename(columns={0:'Count'}).sort_values(by='Count',ascending=False).head(25)

Unnamed: 0_level_0,Count
title,Unnamed: 1_level_1
Forrest Gump (1994),329
"Shawshank Redemption, The (1994)",317
Pulp Fiction (1994),307
"Silence of the Lambs, The (1991)",279
"Matrix, The (1999)",278
Star Wars: Episode IV - A New Hope (1977),251
Jurassic Park (1993),238
Braveheart (1995),237
Terminator 2: Judgment Day (1991),224
Schindler's List (1993),220


### PANDAS : 2nd way to find most rated movies by values_count() method

In [26]:
movies_ratings['title'].value_counts().head(25)

Forrest Gump (1994)                                                               329
Shawshank Redemption, The (1994)                                                  317
Pulp Fiction (1994)                                                               307
Silence of the Lambs, The (1991)                                                  279
Matrix, The (1999)                                                                278
Star Wars: Episode IV - A New Hope (1977)                                         251
Jurassic Park (1993)                                                              238
Braveheart (1995)                                                                 237
Terminator 2: Judgment Day (1991)                                                 224
Schindler's List (1993)                                                           220
Fight Club (1999)                                                                 218
Toy Story (1995)                                      

### In Pyspark
### 1 : Read both the csv(movies.csv and ratings.csv)
### 2 : Merge both the csv using .join method
### 3 : converting dataframe into sql temp table and query via sql

In [27]:
spark_movies_df = spark.read.csv("./ml-latest-small/movies.csv",header=True)
spark_ratings_df = spark.read.csv("./ml-latest-small/ratings.csv",header=True)

In [28]:
spark_movies_df.show(5)
spark_ratings_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [29]:
spark_movies_ratings_df = spark_movies_df.join(spark_ratings_df,on=['movieId'])

In [30]:
spark_movies_ratings_df.show(10)

+-------+--------------------+--------------------+------+------+---------+
|movieId|               title|              genres|userId|rating|timestamp|
+-------+--------------------+--------------------+------+------+---------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1|   4.0|964982703|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1|   4.0|964981247|
|      6|         Heat (1995)|Action|Crime|Thri...|     1|   4.0|964982224|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|     1|   5.0|964983815|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|     1|   5.0|964982931|
|     70|From Dusk Till Da...|Action|Comedy|Hor...|     1|   3.0|964982400|
|    101|Bottle Rocket (1996)|Adventure|Comedy|...|     1|   5.0|964980868|
|    110|   Braveheart (1995)|    Action|Drama|War|     1|   4.0|964982176|
|    151|      Rob Roy (1995)|Action|Drama|Roma...|     1|   5.0|964984041|
|    157|Canadian Bacon (1...|          Comedy|War|     1|   5.0|964984100|
+-------+---

### 1 way, via merging df and group by on dataframe

In [31]:
from pyspark.sql.functions import desc,col
# spark_movies_ratings_df.groupBy('title').count().orderBy('count',ascending=False).show(25)
spark_movies_ratings_df.groupBy('title').count().sort(col('count').desc()).show(25)

+--------------------+-----+
|               title|count|
+--------------------+-----+
| Forrest Gump (1994)|  329|
|Shawshank Redempt...|  317|
| Pulp Fiction (1994)|  307|
|Silence of the La...|  279|
|  Matrix, The (1999)|  278|
|Star Wars: Episod...|  251|
|Jurassic Park (1993)|  238|
|   Braveheart (1995)|  237|
|Terminator 2: Jud...|  224|
|Schindler's List ...|  220|
|   Fight Club (1999)|  218|
|    Toy Story (1995)|  215|
|Star Wars: Episod...|  211|
|Usual Suspects, T...|  204|
|American Beauty (...|  204|
|Seven (a.k.a. Se7...|  203|
|Independence Day ...|  202|
|    Apollo 13 (1995)|  201|
|Raiders of the Lo...|  200|
|Lord of the Rings...|  198|
|Star Wars: Episod...|  196|
|Godfather, The (1...|  192|
|Fugitive, The (1993)|  190|
|       Batman (1989)|  189|
|Saving Private Ry...|  188|
+--------------------+-----+
only showing top 25 rows



### 2nd way via SQL

In [32]:
spark_movies_ratings_df.registerTempTable("movies_ratings_table")

In [33]:
sqlContext.sql("select title,COUNT('title') from movies_ratings_table group by title order by 2 desc").show(25)

+--------------------+------------+
|               title|count(title)|
+--------------------+------------+
| Forrest Gump (1994)|         329|
|Shawshank Redempt...|         317|
| Pulp Fiction (1994)|         307|
|Silence of the La...|         279|
|  Matrix, The (1999)|         278|
|Star Wars: Episod...|         251|
|Jurassic Park (1993)|         238|
|   Braveheart (1995)|         237|
|Terminator 2: Jud...|         224|
|Schindler's List ...|         220|
|   Fight Club (1999)|         218|
|    Toy Story (1995)|         215|
|Star Wars: Episod...|         211|
|American Beauty (...|         204|
|Usual Suspects, T...|         204|
|Seven (a.k.a. Se7...|         203|
|Independence Day ...|         202|
|    Apollo 13 (1995)|         201|
|Raiders of the Lo...|         200|
|Lord of the Rings...|         198|
|Star Wars: Episod...|         196|
|Godfather, The (1...|         192|
|Fugitive, The (1993)|         190|
|       Batman (1989)|         189|
|Saving Private Ry...|      

### Assignment 4 
### Spark Streaming
### Real-time Monitoring of the Most Popular Hashtags on Twitter

In [34]:
import tweepy
from tweepy.auth import OAuthHandler 
from tweepy import Stream
from tweepy.streaming import StreamListener
import json

In [1]:
# Tutorial followed
# https://towardsdatascience.com/sentiment-analysis-on-streaming-twitter-data-using-spark-structured-streaming-python-fc873684bfe3

consumer_key = "Your Key"
consumer_secret = "Your Key"
access_token = "Your Key"
access_secret = "Your Key"

In [39]:
class TweetStreamer():
    
    def stream_tweets(self, topic_to_search):
        tweetListner = TweetListner()
        auth = OAuthHandler(consumer_key,consumer_secret)
        auth.set_access_token(access_token,access_secret)

        stream = Stream(auth,tweetListner)
        stream.filter(track=topic_to_search)


class TweetListner(StreamListener):
    import pandas as pd    
    from collections import namedtuple
    fields = ("tags",'count')
    Tweets_count = namedtuple('Tweeet',fields)
    final_hashtags_list = []
    
    # Database connection section
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.config('spark.driver.extraClassPath', "postgresql-42.2.18.jar").getOrCreate()
    url = 'jdbc:postgresql://127.0.0.1/postgres'
    properties = {'user': 'postgres', 'password': 'n0ob007'}
    df = spark.read.jdbc(url=url, table='popular_hashtags', properties=properties)
    df.show()
    
    # Variables for pyspark
    from pyspark.sql.types import StructField,StringType,StructType
    field = [
        StructField("hashtags", StringType(), True),
    ]
    schema = StructType(field)
    hashtags_spark_df = spark.createDataFrame(sc.emptyRDD(),schema)
    
    counter = 0
    
    def on_data(self,data):
        try:
# #-------------Using Pandas---------------------
#             # This piece of code, extract hashtags directly from the tweets
#             hashTags = json.loads(data).get('entities').get('hashtags')
#             if len(hashTags)>0:
#                 final_hashtags = hashTags[0].get('text')
#                 print(final_hashtags)
                            
#                 self.final_hashtags_list.append(final_hashtags)
#                 print(self.pd.Series(self.final_hashtags_list).value_counts().to_frame().rename(columns={0:'Count'}).head(10))
#                 print("-"*50)
# #-------------Till here pandas---------------------

#-----------Using PySpark-----------------------
            # This piece of code, just receive tweets in str and converts it into json and extracts text using key(text)
            # Tweet text is then converted in to RDD and only #tags are extracted.
            # After extracting hashtags, I am forming a pyspark dataframe and inserting it into database(postgresql. Table[popular_hashtags])
            detailed_tweet = json.loads(data).get('text')
#             print(detailed_tweet)
#             print("Exxxx",'extended_tweet' in detailed_tweet)
#             if 'extended_tweet' in detailed_tweet:
#                 tweet_full_text = detailed_tweet['extended_tweet']['full_text'].encode('utf-8')
#             else:
#                 tweet_full_text = detailed_tweet['text'].encode('utf-8')
#             print(tweet_full_text)
            twt_rdd = sc.parallelize([detailed_tweet])
            print("RDD : ",twt_rdd.collect())
            extracted_hashtags = twt_rdd.flatMap(lambda lines: lines.split("\n")).flatMap(lambda each_lines:each_lines.split(" ")).filter(lambda x:x.lower().startswith("#"))
#             extracted_hashtags = twt_rdd.flatMap(lambda lines: self.split_lines_by(lines,"\n")).collect()
#             flatMap(lambda lines: self.split_lines_by(lines,"\n"))
#             .flatMap(lambda l:self.split_lines_by(l," ")).filter(lambda x:x.lower().startswith("#"))
            if extracted_hashtags.count()>0:
                print('Count of hashtags :',extracted_hashtags.count())
#                     [print(_ele) for _ele in extracted_hashtags.collect()]
                for _ele in extracted_hashtags.collect():
                    print("_ele --> ",_ele)
                    row = [[_ele]]
                    new_df = spark.createDataFrame(row)
                    # new_df.show()
                    self.hashtags_spark_df = self.hashtags_spark_df.union(new_df)
#                     self.hashtags_spark_df.show()
            self.hashtags_spark_df.show()
            print("-"*50)
        
            # At this line, if the pyspark dataframe count is divisible by 2 then insert in append mode in DB
            if self.hashtags_spark_df.count() > self.counter and self.hashtags_spark_df.count()%2==0:
                self.insert_into_db()
                self.counter = self.hashtags_spark_df.count() + 1
# -------------------------till here pyspark------------------------
            return True
        except Exception as e:
            print("Exception Caught while reading the data : ",str(e))
        return True
    
    def on_error(self, status):
        print("Error Found : ",status)
        return True

    def insert_into_db(self):
        print("Inside db method : ",self.hashtags_spark_df.count())
        self.hashtags_spark_df.write\
            .format("jdbc")\
            .mode("append")\
            .option("truncate","true")\
            .option("url",self.url)\
            .option("dbtable","popular_hashtags")\
            .option("user","postgres")\
            .option("password","n0ob007").save()

    def split_lines_by(lines,by):
        print("inside split by lines : ",lines.split(by))
        return lines.split(by)

+---------------------------+
|                   hashtags|
+---------------------------+
|             #PMCBankCrisis|
|                #SushantDay|
|#NUEST_JR_아론_백호_민현_렌|
|                  #뉴이스트|
|                     #NUEST|
|          #BoycottBollywood|
|                #SushantDay|
+---------------------------+



In [40]:
topic_to_search = ['Modi','SSR','football','cricket','Bollywood','Elon musk']
TweetStreamer().stream_tweets(topic_to_search)

RDD :  ['RT @mr_mayank: So Modi has openly called protesters as "Andolan Jivi" and said we need to save our nation from them.\n\nBut in reality, it is…']
+--------+
|hashtags|
+--------+
+--------+

--------------------------------------------------
RDD :  ['RT @ShaneWarne: What is going on with your cricket team ? What on Earth are they doing just letting the game drift ? Why aren’t they bowlin…']
+--------+
|hashtags|
+--------+
+--------+

--------------------------------------------------
RDD :  ['RT @ishkarnBHANDARI: PM Modi is definitely the best orator in Bharat. \n\nFrom Humour, Sarcasm, Factual rebuttal to setting narratives he sin…']
+--------+
|hashtags|
+--------+
+--------+

--------------------------------------------------
RDD :  ['RT @kamaalrkhan: Abki Baar Trump Sarkar! Ye Nara Diya Tha Modi Ji Ne USA main. Modi did campaign for #Trump! So if #Rihanna will campaign f…']
Count of hashtags : 2
_ele -->  #Trump!
_ele -->  #Rihanna
+--------+
|hashtags|
+--------+
| #Trump

KeyboardInterrupt: 

### Analysing popular hashtags by reading data from DB

In [41]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.extraClassPath', "postgresql-42.2.18.jar").getOrCreate()
url = 'jdbc:postgresql://127.0.0.1/postgres'
properties = {'user': 'postgres', 'password': 'n0ob007'}
popular_hashtags_df = spark.read.jdbc(url=url, table='popular_hashtags', properties=properties)
popular_hashtags_df.show()

# Grouping by hashtags
from pyspark.sql.functions import desc,col
popular_hashtags_df.groupBy('hashtags').count().sort(col('count').desc()).show(25)

+---------------------------+
|                   hashtags|
+---------------------------+
|             #PMCBankCrisis|
|                #SushantDay|
|#NUEST_JR_아론_백호_민현_렌|
|                  #뉴이스트|
|                     #NUEST|
|          #BoycottBollywood|
|                #SushantDay|
|                    #Trump!|
|                   #Rihanna|
|                    #Trump!|
|                   #Rihanna|
|               #KatrinaKaif|
|                #SalmanKhan|
+---------------------------+

+---------------------------+-----+
|                   hashtags|count|
+---------------------------+-----+
|                #SushantDay|    2|
|                   #Rihanna|    2|
|                    #Trump!|    2|
|                  #뉴이스트|    1|
|                     #NUEST|    1|
|                #SalmanKhan|    1|
|          #BoycottBollywood|    1|
|               #KatrinaKaif|    1|
|#NUEST_JR_아론_백호_민현_렌|    1|
|             #PMCBankCrisis|    1|
+---------------------------+-----+

