<a href="https://colab.research.google.com/github/shubhanshu-26/Big_data/blob/main/Data-migration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Migrate data from RDD to dataframe
# option 1 Using Reflection based approach
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
spark = SparkSession \
                        .builder \
                        .master("local") \
                        .appName("shub-demo") \
                        .getOrCreate() \


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
movies_path = "/content/drive/MyDrive/Spark-datasets/movies.csv"
movies_rdd = spark.sparkContext.textFile(movies_path)
movies_rdd.take(5)

['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance']

In [None]:
ratings_path = "/content/drive/MyDrive/Spark-datasets/ratings.csv"
ratings_rdd = spark.sparkContext.textFile(ratings_path)
ratings_rdd.take(5)

['userId,movieId,rating,timestamp',
 '1,296,5.0,1147880044',
 '1,306,3.5,1147868817',
 '1,307,5.0,1147868828',
 '1,665,5.0,1147878820']

In [None]:
from pyspark.sql import Row
parts1 = movies_rdd.map(lambda x:x.split(","))
parts2 = ratings_rdd.map(lambda x:x.split(","))

schemaMovies = parts1.map(lambda p: Row(movieId = (p[0]), title = (p[1]), genres = (p[2])))
schemaRating = parts2.map(lambda p: Row(userId = (p[0]), movieId = (p[1]), rating = (p[2]), timestamp = (p[3])))

movie_df = spark.createDataFrame(schemaMovies)
movie_df.createOrReplaceTempView("movies")

rating_df = spark.createDataFrame(schemaRating)
rating_df.createOrReplaceTempView("ratings")

spark.sql("""select m.title, avg(r.rating) as avg_rating, first(r.timestamp) as first_timestamp, count(r.rating) as rating_count
              from movies m
              join ratings r
              on m.movieId = r.movieId
              where r.rating = 5 and m.genres like '%Comedy%'
              group by m.title
              order by rating_count""").show(5)

+--------------------+----------+---------------+------------+------+
|               title|avg_rating|first_timestamp|rating_count|count1|
+--------------------+----------+---------------+------------+------+
| (Girl)Friend (2018)|       5.0|     1563617667|           1|     1|
| #realityhigh (2017)|       5.0|     1530679271|           1|     1|
|      '49-'17 (1917)|       5.0|     1535634881|           1|     1|
|...E fuori nevica...|       5.0|     1546451439|           1|     1|
|"Gabriel ""Fluffy...|       5.0|     1549519332|           1|     1|
+--------------------+----------+---------------+------------+------+
only showing top 5 rows



In [None]:
spark.sql("""
SELECT m.title, m.genres, AVG(r.rating) AS avg_rating
FROM movies m
JOIN ratings r ON m.movieId = r.movieId
GROUP BY m.title, m.genres
ORDER BY avg_rating DESC
""").show()

+--------------------+--------------------+----------+
|               title|              genres|avg_rating|
+--------------------+--------------------+----------+
|Room Full of Spoo...|         Documentary|       5.0|
|Strike Commando (...|              Action|       5.0|
|     Clearcut (1991)|Drama|Thriller|We...|       5.0|
|               "Love| Cheat & Steal (1...|       5.0|
|...E fuori nevica...|              Comedy|       5.0|
|      Solanin (2010)|       Drama|Romance|       5.0|
|"Pioneers of Elec...| Volume 1: Richie...|       5.0|
|A Lustful Man (1961)|        Comedy|Drama|       5.0|
|Big Man - An Unus...|  (no genres listed)|       5.0|
|Motion Picture ('...|  (no genres listed)|       5.0|
|The Unfaithfuls (...|  (no genres listed)|       5.0|
|The Portuguese Ki...|  (no genres listed)|       5.0|
|   The Wicked (2014)|              Horror|       5.0|
|Banana Paradise (...|  (no genres listed)|       5.0|
|      Peacock (2015)|Comedy|Drama|Romance|       5.0|
|Top Secre

In [None]:
# option 2 Using Progmmatic way

schemaString = "id name category"
movies_rdd.take(5)

['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance']

In [None]:
parts = movies_rdd.map(lambda x:x.split(","))
parts.take(5)

[['movieId', 'title', 'genres'],
 ['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'],
 ['2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'],
 ['3', 'Grumpier Old Men (1995)', 'Comedy|Romance'],
 ['4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance']]

In [None]:
cleaned_rdd = parts.map(lambda p: (p[0], p[1], p[2].strip()))
cleaned_rdd.take(5)

[('movieId', 'title', 'genres'),
 ('1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 ('2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 ('3', 'Grumpier Old Men (1995)', 'Comedy|Romance'),
 ('4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance')]

In [None]:
from pyspark.sql.types import StructField,StringType,StringType,StructType

fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)

In [None]:
my_df = spark.createDataFrame(cleaned_rdd, schema)
my_df.createOrReplaceTempView("movies")

In [None]:
spark.sql("""select *
             from movies
             where category = 'Action'
             Order by name""").show(25)

+------+--------------------+--------+
|    id|                name|category|
+------+--------------------+--------+
|149562|'Pimpernel' Smith...|  Action|
|171027|    11 Blocks (2015)|  Action|
|142228|12 Rounds 3: Lock...|  Action|
| 86142|13 Assassins (Jûs...|  Action|
|176829|       6 Days (2017)|  Action|
|128482|7 Hours of Violen...|  Action|
|150864|  80 Milionów (2011)|  Action|
|183365|A Deadly Obsessio...|  Action|
|144286|   A Good Man (2014)|  Action|
|129655|A Nightingale San...|  Action|
|173313|     A2 Racer (2004)|  Action|
|152340|Aap Mujhe Achche ...|  Action|
|123034|Aces Go Places V:...|  Action|
|  2817|Aces: Iron Eagle ...|  Action|
|201977|Action Figures 2 ...|  Action|
|179821|Acts of Vengeance...|  Action|
|183419|Acts of Violence ...|  Action|
|136918|  Age Of Kill (2015)|  Action|
| 93923|  Agent Vinod (2012)|  Action|
|201196|     Airborne (1998)|  Action|
|154947|Airplane vs Volca...|  Action|
|197613|All the Devil's M...|  Action|
|126420|American Heist (2