### Pyspark Assignment
#### Group 13 



### ETL using Pyspark

#### Extract
- Read input from different datasource - json,csv,parquet
- from data structures - dataframe, array, others

#### Transform
- Data preprocessing/cleaning
- RDD - transformation - map,filter
- Dataframe fuction - groupby,filter and so on

#### Load
- Store as file
- store to DB

## Entertainment - netflix shows analytics

i. Extract:  Load the data

   - Read data as pandas dataframe and
     then create spark dataframe and
      create a table view "netflix" as spark SQL
      
ii. Transform: Exploratory data analysis using spark sql queries

    - Unique showId count
    - GroupBy type,release_year and count of showId
    - Update column duration values as 90 min to 90 and 2 seasons to 2 and others
    - groupby type and avg durations
    
iii. Load: Save analysis report

    - save as tables - partitionby type

In [1]:
# import required libraries
import pandas as pd
# importing pyspark
import pyspark
# importing all from pyspark.sql.function
from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import DoubleType, IntegerType, DateType


In [2]:
# import spark session
conf = SparkConf().setAppName("test").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
print(sc)
print(sqlContext)

<SparkContext master=local appName=test>
<pyspark.sql.context.SQLContext object at 0x7f57c510b1f0>


### i. Load the data

#### read netflix data as pandas datframe

In [3]:
netflix= pd.read_csv("./netflix_titles.csv")

In [4]:
netflix.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."
1,s2,TV Show,Blood & Water,,"Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...",South Africa,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, TV Dramas, TV Mysteries","After crossing paths at a party, a Cape Town t..."
2,s3,TV Show,Ganglands,Julien Leclercq,"Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...",,"September 24, 2021",2021,TV-MA,1 Season,"Crime TV Shows, International TV Shows, TV Act...",To protect his family from a powerful drug lor...
3,s4,TV Show,Jailbirds New Orleans,,,,"September 24, 2021",2021,TV-MA,1 Season,"Docuseries, Reality TV","Feuds, flirtations and toilet talk go down amo..."
4,s5,TV Show,Kota Factory,,"Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...",India,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, Romantic TV Shows, TV ...",In a city of coaching centers known to train I...


In [5]:
netflix.shape

(8807, 12)

In [6]:
netflix['show_id'].unique

<bound method Series.unique of 0          s1
1          s2
2          s3
3          s4
4          s5
        ...  
8802    s8803
8803    s8804
8804    s8805
8805    s8806
8806    s8807
Name: show_id, Length: 8807, dtype: object>

#### Convert Pandas to PySpark (Spark) DataFrame

In [7]:
# netflix_spark = sqlContext.createDataFrame(netflix)

In particular some columns (for example event_dt_num) in your data have missing values which pushes Pandas to represent them as mixed types (string for not missing, NaN for missing values).

If you're in doubt it is better to read all data as strings and cast afterwards. If you have access to code book you should always provide schema to avoid problems and reduce overall cost.

Finally passing data from the driver is anti-pattern. You should be able to read this data directly using csv format (Spark 2.0.0+) or spark-csv library (Spark 1.6 and below)

In [8]:
## Created spark datframe for netflix data
netflix_spark_df = (sqlContext.read.format("csv").options(header="true")
    .load("./netflix_titles.csv"))

In [9]:
netflix_spark_df

DataFrame[show_id: string, type: string, title: string, director: string, cast: string, country: string, date_added: string, release_year: string, rating: string, duration: string, listed_in: string, description: string]

In [10]:
## print schema of netflix data
netflix_spark_df.printSchema()


root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [11]:
## Show data 
netflix_spark_df.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

### Register netflix spark dataframe as table 

In [12]:
sqlContext.registerDataFrameAsTable(netflix_spark_df, "netflix_table")

In [13]:
## Using query show table
sqlContext.sql("SELECT * FROM netflix_table ")

DataFrame[show_id: string, type: string, title: string, director: string, cast: string, country: string, date_added: string, release_year: string, rating: string, duration: string, listed_in: string, description: string]

In [14]:

## Using query show spark netflix data
netflix_raw_data = sqlContext.sql("SELECT * FROM netflix_table ")
netflix_raw_data.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

ii.Transform: Exploratory data analysis using spark sql queries

- Unique showId count
- GroupBy type,release_year and count of showId
- Update column duration values as 90 min to 90 and 2 seasons to 2 and others
- groupby type and avg durations

In [15]:
## distinct count using spark sql queries
unique_id_count = sqlContext.sql("select distinct cnt_id from (select count(show_id) as cnt_id from netflix_table) netflix_table;")
unique_id_count.show()

+------+
|cnt_id|
+------+
|  8809|
+------+



In [16]:
unique_id = sqlContext.sql("select distinct count(show_id) as cnt_id from netflix_table")
unique_id.show()

+------+
|cnt_id|
+------+
|  8809|
+------+



In [17]:
#distinct count through spark dataframe
netflix_spark_df.distinct().count()

8809

#### - GroupBy type,release_year and count of showId using spark dataframe 

In [18]:
netflix_spark_df.groupBy("type").count().show()

+-------------+-----+
|         type|count|
+-------------+-----+
|         null|    1|
|      TV Show| 2676|
|        Movie| 6131|
|William Wyler|    1|
+-------------+-----+



In [19]:
#GroupBy on multiple columns
netflix_spark_df.groupBy("type","release_year").count().show()

+-------+---------------+-----+
|   type|   release_year|count|
+-------+---------------+-----+
|  Movie|  June 12, 2021|    1|
|  Movie|           1963|    1|
|TV Show|           1981|    1|
|  Movie|           1971|    5|
|TV Show|           1972|    1|
|TV Show|           1988|    2|
|TV Show|  Nse Ikpe-Etim|    1|
|  Movie|           1956|    2|
|  Movie| Charles Rocket|    1|
|  Movie|           1997|   33|
|  Movie|           2015|  397|
|  Movie|           1969|    2|
|  Movie|           2010|  153|
|  Movie|           1993|   24|
|  Movie|           1977|    6|
|TV Show|           2020|  436|
|TV Show|           1997|    4|
|  Movie|           2016|  657|
|  Movie|           1992|   20|
|TV Show|           1945|    1|
+-------+---------------+-----+
only showing top 20 rows



#### - GroupBy type,release_year and count of showId using spark sql queries

In [20]:
grp_data= sqlContext.sql("select type, release_year, count(show_id) as cnt_id from netflix_table GROUP BY type, release_year")
grp_data.show()


+-------+---------------+------+
|   type|   release_year|cnt_id|
+-------+---------------+------+
|  Movie|  June 12, 2021|     1|
|  Movie|           1963|     1|
|TV Show|           1981|     1|
|  Movie|           1971|     5|
|TV Show|           1972|     1|
|TV Show|           1988|     2|
|TV Show|  Nse Ikpe-Etim|     1|
|  Movie|           1956|     2|
|  Movie| Charles Rocket|     1|
|  Movie|           1997|    33|
|  Movie|           2015|   397|
|  Movie|           1969|     2|
|  Movie|           2010|   153|
|  Movie|           1993|    24|
|  Movie|           1977|     6|
|TV Show|           2020|   436|
|TV Show|           1997|     4|
|  Movie|           2016|   657|
|  Movie|           1992|    20|
|TV Show|           1945|     1|
+-------+---------------+------+
only showing top 20 rows



#### - Update column duration values as 90 min to 90 and 2 seasons to 2 and others
[ref link](https://www.geeksforgeeks.org/split-single-column-into-multiple-columns-in-pyspark-dataframe/)

In [21]:
# split() function defining parameters
split_cols = pyspark.sql.functions.split(netflix_spark_df['duration'], ' ')

In [22]:
netflix_spark_df = netflix_spark_df.withColumn('upd_duration', split_cols.getItem(0))

In [23]:
netflix_spark_df.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|upd_duration|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|          90|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After

In [31]:
# show upd_duration column values
netflix_spark_df.select("upd_duration")

DataFrame[upd_duration: int]

In [32]:
netflix_spark_df.select("upd_duration").show()

+------------+
|upd_duration|
+------------+
|          90|
|           2|
|           1|
|           1|
|           2|
|           1|
|          91|
|         125|
|           9|
|         104|
|           1|
|           1|
|         127|
|          91|
|           1|
|           4|
|          67|
|           2|
|          94|
|           1|
+------------+
only showing top 20 rows



In [33]:
# update_dur = sqlContext.sql("select dbo.GetNumericValue(duration) AS updated_duration from netflix_table")
# update_dur.show()


#### - groupby type and avg durations

[ref link](https://towardsdatascience.com/change-column-type-pyspark-df-eecbe726fdbc)


In [27]:
netflix_spark_df = netflix_spark_df.withColumn('upd_duration', col('upd_duration').cast(IntegerType()))

In [34]:
#GroupBy on column
netflix_spark_df.groupBy("type").avg('upd_duration').show()

+-------------+------------------+
|         type| avg(upd_duration)|
+-------------+------------------+
|         null|              null|
|      TV Show|1.7654320987654322|
|        Movie| 99.88907068062828|
|William Wyler|              null|
+-------------+------------------+



#### iii. Load: Save analysis report
- save as tables - partitionby type
[ref link](https://sparkbyexamples.com/pyspark/pyspark-partitionby-example/)

In [35]:
# save spark dataframe as table
netflix_spark_df.write.partitionBy('type').saveAsTable('netflix_trans_table')

In [30]:
#save transformed spark datframe as partitionBy() column 'test'
netflix_spark_df.write.option("header",True) \
        .partitionBy("type") \
        .mode("overwrite") \
        .csv("./output")