<a href="https://colab.research.google.com/github/saisumanthkorada/PysprkTransformations/blob/main/Transforming_the_file.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark
!pip install openpyxl




In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import split, col,length,right,expr,regexp_extract,regexp_replace,trim,explode,first,lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
schema = StructType([
    StructField("movieId",IntegerType(),True),
    StructField("title",StringType(),True),
    StructField("generes",StringType(),True)
])
df1 = spark.read.format('csv')\
.option('header',True)\
.schema(schema)\
.load('/content/drive/MyDrive/ml-latest/movies.csv')
df1.show()

+-------+--------------------+--------------------+
|movieId|               title|             generes|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [6]:
df1 = df1.withColumn(
  "year",
  regexp_extract(col("title"), r"\((\d{4})\)$", 1)
)
df1=df1.withColumn("title",
    trim(regexp_replace(col("title"), r"\s*\(\d{4}\)$", "")))

In [7]:
df1 = df1.withColumn("genersarray",split(col('generes'), r"\s*\|\s*"))

In [8]:
df1.show()

+-------+--------------------+--------------------+----+--------------------+
|movieId|               title|             generes|year|         genersarray|
+-------+--------------------+--------------------+----+--------------------+
|      1|           Toy Story|Adventure|Animati...|1995|[Adventure, Anima...|
|      2|             Jumanji|Adventure|Childre...|1995|[Adventure, Child...|
|      3|    Grumpier Old Men|      Comedy|Romance|1995|   [Comedy, Romance]|
|      4|   Waiting to Exhale|Comedy|Drama|Romance|1995|[Comedy, Drama, R...|
|      5|Father of the Bri...|              Comedy|1995|            [Comedy]|
|      6|                Heat|Action|Crime|Thri...|1995|[Action, Crime, T...|
|      7|             Sabrina|      Comedy|Romance|1995|   [Comedy, Romance]|
|      8|        Tom and Huck|  Adventure|Children|1995|[Adventure, Child...|
|      9|        Sudden Death|              Action|1995|            [Action]|
|     10|           GoldenEye|Action|Adventure|...|1995|[Action,

In [9]:
df_exploded = df1.withColumn("generes",explode("genersarray"))

In [10]:
df_exploded.show()

+-------+--------------------+---------+----+--------------------+
|movieId|               title|  generes|year|         genersarray|
+-------+--------------------+---------+----+--------------------+
|      1|           Toy Story|Adventure|1995|[Adventure, Anima...|
|      1|           Toy Story|Animation|1995|[Adventure, Anima...|
|      1|           Toy Story| Children|1995|[Adventure, Anima...|
|      1|           Toy Story|   Comedy|1995|[Adventure, Anima...|
|      1|           Toy Story|  Fantasy|1995|[Adventure, Anima...|
|      2|             Jumanji|Adventure|1995|[Adventure, Child...|
|      2|             Jumanji| Children|1995|[Adventure, Child...|
|      2|             Jumanji|  Fantasy|1995|[Adventure, Child...|
|      3|    Grumpier Old Men|   Comedy|1995|   [Comedy, Romance]|
|      3|    Grumpier Old Men|  Romance|1995|   [Comedy, Romance]|
|      4|   Waiting to Exhale|   Comedy|1995|[Comedy, Drama, R...|
|      4|   Waiting to Exhale|    Drama|1995|[Comedy, Drama, R

In [11]:
df_final = df_exploded.groupBy("movieId","title","year").pivot("generes").agg(first(lit("yes")))

In [12]:
l= df_final.columns

In [13]:
for k in l:
  if "(" in k:
    df_final = df_final.drop(k)

In [14]:
df_final = df_final.fillna("no")
df_final.show()

+-------+--------------------+----+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|movieId|               title|year|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+--------------------+----+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|      4|   Waiting to Exhale|1995|    no|       no|       no|      no|   yes|   no|         no|  yes|     no|       no|    no|  no|     no|     no|    yes|    no|      no| no|     no|
|      5|Father of the Bri...|1995|    no|       no|       no|      no|   yes|   no|         no|   no|     no|       no|    no|  no|     no|     no|     no|    no|      no| no|     no|
|      6|                Heat|1995|   yes|       no|       no|      no|    

In [15]:
df_final = df_final.dropDuplicates()

In [16]:
df_final = df_final.orderBy('movieId')

In [17]:
df_final.show()

+-------+--------------------+----+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|movieId|               title|year|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+--------------------+----+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|      1|           Toy Story|1995|    no|      yes|      yes|     yes|   yes|   no|         no|   no|    yes|       no|    no|  no|     no|     no|     no|    no|      no| no|     no|
|      2|             Jumanji|1995|    no|      yes|       no|     yes|    no|   no|         no|   no|    yes|       no|    no|  no|     no|     no|     no|    no|      no| no|     no|
|      3|    Grumpier Old Men|1995|    no|       no|       no|      no|   y

In [None]:
df_final.write.format('csv').option('header','True').mode('overwrite').save('/content/drive/MyDrive/Silver_Layer/movies_transformed.csv')

In [18]:
schema2 = StructType([StructField("userId",IntegerType(),True),
                      StructField("movieId",IntegerType(),True),
                      StructField("rating",FloatType(),True),
                      StructField("timestamp",IntegerType(),True)])
df2 = spark.read.format('csv')\
.option('header',True)\
.schema(schema2)\
.load('/content/drive/MyDrive/ml-latest/ratings.csv')

In [25]:
df2.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      1|   4.0|1225734739|
|     1|    110|   4.0|1225865086|
|     1|    158|   4.0|1225733503|
|     1|    260|   4.5|1225735204|
|     1|    356|   5.0|1225735119|
|     1|    381|   3.5|1225734105|
|     1|    596|   4.0|1225733524|
|     1|   1036|   5.0|1225735626|
|     1|   1049|   3.0|1225734079|
|     1|   1066|   4.0|1225736961|
|     1|   1196|   3.5|1225735441|
|     1|   1200|   3.5|1225735861|
|     1|   1210|   4.5|1225735210|
|     1|   1214|   4.0|1225736426|
|     1|   1291|   5.0|1225734809|
|     1|   1293|   2.0|1225733842|
|     1|   1376|   3.0|1225733539|
|     1|   1396|   3.0|1225733534|
|     1|   1537|   4.0|1225736687|
|     1|   1909|   3.0|1225733717|
+------+-------+------+----------+
only showing top 20 rows



In [19]:
df2 = df2.withColumn("timestamp",F.from_unixtime("timestamp").cast("timestamp"))

In [20]:
df2 = df2.withColumn("date",F.date_format("timestamp","yyyy-mm-dd"))

In [21]:
df2 = df2.drop("year")

In [22]:
df_ratings = df2

In [23]:
df_ratings.show(20)

+------+-------+------+-------------------+----------+
|userId|movieId|rating|          timestamp|      date|
+------+-------+------+-------------------+----------+
|     1|      1|   4.0|2008-11-03 17:52:19|2008-52-03|
|     1|    110|   4.0|2008-11-05 06:04:46|2008-04-05|
|     1|    158|   4.0|2008-11-03 17:31:43|2008-31-03|
|     1|    260|   4.5|2008-11-03 18:00:04|2008-00-03|
|     1|    356|   5.0|2008-11-03 17:58:39|2008-58-03|
|     1|    381|   3.5|2008-11-03 17:41:45|2008-41-03|
|     1|    596|   4.0|2008-11-03 17:32:04|2008-32-03|
|     1|   1036|   5.0|2008-11-03 18:07:06|2008-07-03|
|     1|   1049|   3.0|2008-11-03 17:41:19|2008-41-03|
|     1|   1066|   4.0|2008-11-03 18:29:21|2008-29-03|
|     1|   1196|   3.5|2008-11-03 18:04:01|2008-04-03|
|     1|   1200|   3.5|2008-11-03 18:11:01|2008-11-03|
|     1|   1210|   4.5|2008-11-03 18:00:10|2008-00-03|
|     1|   1214|   4.0|2008-11-03 18:20:26|2008-20-03|
|     1|   1291|   5.0|2008-11-03 17:53:29|2008-53-03|
|     1|  

In [27]:
from IPython.display import Markdown, display



In [28]:
display(Markdown("## Retrieving the movie names using broadcast variables "))


## Retrieving the movie names using broadcast variables 

In [29]:
movie_dict = dict(df_final.rdd.map(lambda x: (x['movieId'], x['title'])).collect())

In [30]:
movie_b = spark.sparkContext.broadcast(movie_dict)

In [31]:
from pyspark.sql import Row

ratings_with_names_rdd = df_ratings.rdd.map(
    lambda row: Row(
        user_id=row['userId'],
        movie_id=row['movieId'],
        rating=row['rating'],
        date=row['date'],
        movie_name=movie_b.value.get(row['movieId'], 'Unknown')
    )
)


In [32]:
df_final_ratings_withnames = spark.createDataFrame(ratings_with_names_rdd)

In [38]:
display(Markdown("## Finding the average ratings of movies   "))


## Finding the average ratings of movies   

In [42]:
df_final_ratings_withnames.groupBy("movie_id","movie_name").agg(F.avg("rating").alias("avg_rating")).orderBy("movie_id",ascending=False).show(20)

+--------+--------------------+----------+
|movie_id|          movie_name|avg_rating|
+--------+--------------------+----------+
|  288983|UNZIPPED: An Auto...|       3.0|
|  288977|Skinford: Death S...|       3.0|
|  288975|The Men Who Made ...|       4.0|
|  288971|         Ouija Japan|       0.5|
|  288967|State of Siege: T...|       3.5|
|  288965|            Камертон|       2.5|
|  288959|Letters Of Happiness|       2.0|
|  288957|     Ballet Of Blood|       1.0|
|  288955|     Agata's Friends|       2.0|
|  288953|    The Eyes Have It|       1.5|
|  288951|    A Taste of Whale|       3.5|
|  288949|Eldorado: Everyth...|       0.5|
|  288947|The Year I Starte...|       0.5|
|  288945|Mr. Car and the K...|      0.75|
|  288943|         The Mount 2|       1.5|
|  288941|       Mixed Baggage|      4.75|
|  288939|      Wedding Season|      4.25|
|  288935|          Kosovolove|       2.0|
|  288931|     V for Vengeance|       4.5|
|  288927|            Fortunes|       2.5|
+--------+-

In [35]:
df_final_ratings = df_final_ratings_withnames

In [36]:
df_final_ratings.write.format('csv').option('header','True').mode('overwrite').save('/content/drive/MyDrive/Silver_Layer/ratings_transformed.csv')