## Data Transformation with PySpark

In [147]:
SRC = "/home/labuser/Downloads/Pandas_datasets/IMDB-Movie-Data.csv"

In [148]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TestApp").getOrCreate()

In [149]:
sc=spark.sparkContext

In [150]:
Df = spark.read.option("inferSchema",True).option("header",True).csv(SRC)

In [151]:
Df.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Runtime (Minutes): string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Votes: string (nullable = true)
 |-- Revenue (Millions): double (nullable = true)
 |-- Metascore: double (nullable = true)



In [152]:
Df.dtypes

[('Rank', 'int'),
 ('Title', 'string'),
 ('Genre', 'string'),
 ('Description', 'string'),
 ('Director', 'string'),
 ('Actors', 'string'),
 ('Year', 'string'),
 ('Runtime (Minutes)', 'string'),
 ('Rating', 'string'),
 ('Votes', 'string'),
 ('Revenue (Millions)', 'double'),
 ('Metascore', 'double')]

In [153]:
Df.show()

+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------+------+------------------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|                Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------+------+------------------+---------+
|   1|Guardians of the ...|Action,Adventure,...|A group of interg...|          James Gunn|Chris Pratt, Vin ...|                2014|              121|   8.1|757074|            333.13|     76.0|
|   2|          Prometheus|Adventure,Mystery...|Following clues t...|        Ridley Scott|Noomi Rapace, Log...|                2012|              124|     7|485820|            126.46|     65.0|
|   3|               Split|   

In [154]:
Df.rdd.getNumPartitions()

1

### Creating a Custom Partition

In [155]:
Df = Df.repartition(8)

In [156]:
Df.rdd.getNumPartitions()

8

In [157]:
for item in Df.schema:
    print(item)

StructField('Rank', IntegerType(), True)
StructField('Title', StringType(), True)
StructField('Genre', StringType(), True)
StructField('Description', StringType(), True)
StructField('Director', StringType(), True)
StructField('Actors', StringType(), True)
StructField('Year', StringType(), True)
StructField('Runtime (Minutes)', StringType(), True)
StructField('Rating', StringType(), True)
StructField('Votes', StringType(), True)
StructField('Revenue (Millions)', DoubleType(), True)
StructField('Metascore', DoubleType(), True)


### Accessing Web URL of Spark

In [158]:
print(sc.uiWebUrl)

http://ip-172-31-5-77.ap-south-1.compute.internal:4040


In [159]:
print(spark._sc.uiWebUrl)

http://ip-172-31-5-77.ap-south-1.compute.internal:4040


## Writing the SQL code

In [160]:
Df.createOrReplaceTempView("movie")

### Accessing the data from the table

In [161]:
SQL = "SELECT * FROM movie"
newDf = spark.sql(SQL)

In [162]:
newDf.show()

+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
| 427|Sin City: A Dame ...|Action,Crime,Thri...|Some of Sin City'...|        Frank Miller|Mickey Rourke, Je...|2014|              102|   6.5|122185|             13.75|     46.0|
| 672|            Child 44|Crime,Drama,Thriller|A disgraced membe...|     Daniel Espinosa|Tom Hardy, Gary O...|2015|              137|   6.5| 47703|              1.21|     41.0|
| 782|           Self/less|Action,Mystery,Sc...|A dying real esta...|        Tarsem Singh|Ryan Reynolds, Na...

In [163]:
WHERE = "SELECT * FROM movie WHERE RANK = 6"
whereDf = spark.sql(WHERE)
whereDf.show()

+----+--------------+--------------------+--------------------+-----------+--------------------+----+-----------------+------+-----+------------------+---------+
|Rank|         Title|               Genre|         Description|   Director|              Actors|Year|Runtime (Minutes)|Rating|Votes|Revenue (Millions)|Metascore|
+----+--------------+--------------------+--------------------+-----------+--------------------+----+-----------------+------+-----+------------------+---------+
|   6|The Great Wall|Action,Adventure,...|European mercenar...|Yimou Zhang|Matt Damon, Tian ...|2016|              103|   6.1|56036|             45.13|     42.0|
+----+--------------+--------------------+--------------------+-----------+--------------------+----+-----------------+------+-----+------------------+---------+



### Transformation of Data

In [164]:
import pyspark.sql.functions as fn

In [165]:
print(Df.dtypes)

[('Rank', 'int'), ('Title', 'string'), ('Genre', 'string'), ('Description', 'string'), ('Director', 'string'), ('Actors', 'string'), ('Year', 'string'), ('Runtime (Minutes)', 'string'), ('Rating', 'string'), ('Votes', 'string'), ('Revenue (Millions)', 'double'), ('Metascore', 'double')]


In [166]:
testDf = Df.sort(fn.col('Title')).select('Rank','Title','Director')

In [167]:
testDf.groupBy('Director').count().show()

+------------------+-----+
|          Director|count|
+------------------+-----+
|    Chan-wook Park|    1|
|George Tillman Jr.|    3|
|   Terrence Malick|    2|
|   Louis Leterrier|    4|
|  Jeremy Gillespie|    1|
|        Tim Burton|    4|
|        Sam Mendes|    3|
|    Matthew Vaughn|    4|
|     Glenn Ficarra|    3|
|         Billy Ray|    1|
|     James Mangold|    1|
|    Mike Birbiglia|    1|
| David F. Sandberg|    1|
|Paul W.S. Anderson|    6|
|      Jeff Nichols|    2|
|   Stephen Sommers|    1|
|  Steven Shainberg|    1|
|       Ben Stiller|    3|
|   François Simard|    1|
|Sam Taylor-Johnson|    1|
+------------------+-----+
only showing top 20 rows



In [168]:
testDf.show()

+----+--------------------+--------------------+
|Rank|               Title|            Director|
+----+--------------------+--------------------+
| 508|(500) Days of Summer|           Marc Webb|
| 119| 10 Cloverfield Lane|    Dan Trachtenberg|
| 697|            10 Years|        Jamie Linden|
| 112|    12 Years a Slave|       Steve McQueen|
| 818|           127 Hours|         Danny Boyle|
| 169|            13 Hours|         Michael Bay|
| 805|                1408|     Mikael Håfström|
| 923|            17 Again|         Burr Steers|
| 473|                2012|     Roland Emmerich|
| 248|  20th Century Women|          Mike Mills|
| 851|                  21|      Robert Luketic|
| 273|      21 Jump Street|           Phil Lord|
| 277|      22 Jump Street|           Phil Lord|
| 617|2307: Winter's Dream|         Joey Curtis|
| 876|      28 Weeks Later|Juan Carlos Fresn...|
| 959|      3 Days to Kill|                 McG|
| 431|            3 Idiots|     Rajkumar Hirani|
| 114|              

In [169]:
from datetime import datetime as dt
testDf = testDf.withColumn('last_updated',fn.lit(str(dt.now())))

In [170]:
testDf.show()

+----+--------------------+--------------------+--------------------+
|Rank|               Title|            Director|        last_updated|
+----+--------------------+--------------------+--------------------+
| 508|(500) Days of Summer|           Marc Webb|2023-09-22 08:04:...|
| 119| 10 Cloverfield Lane|    Dan Trachtenberg|2023-09-22 08:04:...|
| 697|            10 Years|        Jamie Linden|2023-09-22 08:04:...|
| 112|    12 Years a Slave|       Steve McQueen|2023-09-22 08:04:...|
| 818|           127 Hours|         Danny Boyle|2023-09-22 08:04:...|
| 169|            13 Hours|         Michael Bay|2023-09-22 08:04:...|
| 805|                1408|     Mikael Håfström|2023-09-22 08:04:...|
| 923|            17 Again|         Burr Steers|2023-09-22 08:04:...|
| 473|                2012|     Roland Emmerich|2023-09-22 08:04:...|
| 248|  20th Century Women|          Mike Mills|2023-09-22 08:04:...|
| 851|                  21|      Robert Luketic|2023-09-22 08:04:...|
| 273|      21 Jump 

### Select with expression

In [171]:
EXPRESSION = "CAST(last_updated AS DATE) AS test"
testDf = testDf.selectExpr(EXPRESSION,'Title')

In [172]:
testDf.show()

+----------+--------------------+
|      test|               Title|
+----------+--------------------+
|2023-09-22|(500) Days of Summer|
|2023-09-22| 10 Cloverfield Lane|
|2023-09-22|            10 Years|
|2023-09-22|    12 Years a Slave|
|2023-09-22|           127 Hours|
|2023-09-22|            13 Hours|
|2023-09-22|                1408|
|2023-09-22|            17 Again|
|2023-09-22|                2012|
|2023-09-22|  20th Century Women|
|2023-09-22|                  21|
|2023-09-22|      21 Jump Street|
|2023-09-22|      22 Jump Street|
|2023-09-22|2307: Winter's Dream|
|2023-09-22|      28 Weeks Later|
|2023-09-22|      3 Days to Kill|
|2023-09-22|            3 Idiots|
|2023-09-22|                 300|
|2023-09-22|300: Rise of an E...|
|2023-09-22|                  31|
+----------+--------------------+
only showing top 20 rows



### Alias

In [173]:
testDf = testDf.select(fn.col("Title").alias("Movie"))

### Handling the null values

In [174]:
dir(testDf)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collect_as_arrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_joinAsOf',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_session',
 '_sort_cols',
 '_sql_ctx',
 '_support_repr_html',
 '_to_corrected_pandas_type',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'drop

In [175]:
testDf.count()

1000

In [176]:
testDf = testDf.dropDuplicates(['Movie'])

In [177]:
Df.columns

['Rank',
 'Title',
 'Genre',
 'Description',
 'Director',
 'Actors',
 'Year',
 'Runtime (Minutes)',
 'Rating',
 'Votes',
 'Revenue (Millions)',
 'Metascore']

### Dropping the Unwanted Column

In [178]:
df = Df.drop("Genre")

### Conditional in PySpark
    - if --> when
    - else --> otherwise

In [179]:
from pyspark.sql.functions import *

In [144]:
df = df.withColumn("Rating_Value",when(col('Rating')< 8,"Good")
                   .when(col('Rating')< 5,"Average")
                   .otherwise("Best"))

In [145]:
df = df.select('Rating_value')

In [180]:
df.show()

+----+--------------------+--------------------+----------------+--------------------+----+-----------------+------+------+------------------+---------+
|Rank|               Title|         Description|        Director|              Actors|Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+----------------+--------------------+----+-----------------+------+------+------------------+---------+
| 105|The Man from U.N....|In the early 1960...|     Guy Ritchie|Henry Cavill, Arm...|2015|              116|   7.3|202973|             45.43|     56.0|
| 492|     The Book of Eli|A post-apocalypti...|   Albert Hughes|Denzel Washington...|2010|              118|   6.9|241359|             94.82|     53.0|
| 884|                 Rio|When Blu, a domes...| Carlos Saldanha|Jesse Eisenberg, ...|2011|               96|   6.9|173919|            143.62|     63.0|
| 479|      Paint It Black|A young woman att...|   Amber Tamblyn|Alia Shawkat, Nan

## Creating a User Defined Function

In [185]:
def concat_shell(column):
    return column + "_shell"

In [186]:
concat_shell("tes")

'tes_shell'

### Registering the User Defined Function to PySpark

In [187]:
my_uds = udf(concat_shell,StringType())

### Using the User Defined Function

In [188]:
testDf = df.withColumn("new_col",my_uds(col("Title")))

In [189]:
testDf.select("new_col").show()

[Stage 159:>                                                        (0 + 1) / 1]

+--------------------+
|             new_col|
+--------------------+
|The House Bunny_s...|
|       Ant-Man_shell|
|  Tomorrowland_shell|
|         Taken_shell|
|Suite Française_s...|
|    Knocked Up_shell|
|           Joy_shell|
|The Huntsman: Win...|
|        Looper_shell|
|      The Town_shell|
|        Detour_shell|
|We Are Your Frien...|
|  Blood Father_shell|
|Secret in Their E...|
|X-Men: Apocalypse...|
|Hunt for the Wild...|
|      Stardust_shell|
|The Girl with the...|
|Couples Retreat_s...|
| Sausage Party_shell|
+--------------------+
only showing top 20 rows



                                                                                

## Caching

In [190]:
 CDf = spark.read.option("inferSchema",True).option("header",True).csv(SRC)

In [191]:
CDf.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Runtime (Minutes): string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Votes: string (nullable = true)
 |-- Revenue (Millions): double (nullable = true)
 |-- Metascore: double (nullable = true)



In [194]:
CDf = CDf.cache()

23/09/22 08:33:57 WARN CacheManager: Asked to cache already cached data.


# Joins in PySpark