# <font color='blue'>Data Pipeline using PySpark
---

**Outline**

1. Review:
    - Introduction to Apache Spark & PySpark I
    - Introduction to Apache Spark & PySpark II
2. Case Study: Data Pipeline Movie Data

### **Create Spark Session**
---

We're going to name the session as `"Data Pipeline using PySpark"`

In [1]:
# import library
import pyspark

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .appName("Data Pipeline using PySpark") \
    .getOrCreate()

In [4]:
spark

### **1. Extract Process**
---

#### **1a**
---

- Read the csv data first in `data/ratings.csv` using PySpark
- Save it to `df_ratings` variable

In [5]:
def read_data_csv(PATH:str,filename:str):
    try:
        """
            Function to read data from csv file 
        """
        data = spark.read.option("header", "true").csv(DATA_PATH + filename) 
        return data
    except Exception as e:
        print(f"ERROR : {e}")    

In [6]:
DATA_PATH = '../data/'
filename = 'ratings.csv'
df_ratings = read_data_csv(DATA_PATH, filename)
df_ratings.show(3)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
+------+-------+------+----------+
only showing top 3 rows



In [7]:
df_ratings.count()

26024289

#### **1b**
---

In [8]:
pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.1
Note: you may need to restart the kernel to use updated packages.


In [9]:
from dotenv import load_dotenv
import os

In [10]:
# Load .env file
load_dotenv()

# Access variables
db_host = os.getenv("POSTGRES_URL_MOVIE")
db_name = os.getenv("POSTGRES_DB_MOVIE")
db_user = os.getenv("POSTGRES_USER_MOVIE")
db_pass = os.getenv("POSTGRES_PASSWORD_MOVIE")
conn_properties = {
    "user": db_user,
    "password": db_pass,
    "driver": "org.postgresql.Driver" # set driver postgres
}

In [11]:
TABLE_NAME = 'movies_metadata'
df_metadata = spark.read.jdbc(url=db_host,table = TABLE_NAME, properties=conn_properties)

In [12]:
df_metadata.show(3, truncate = False, vertical = True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 adult                 | False                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
 belongs

In [13]:
df_metadata.count()

45466

### **2. Source to Target Mapping**
---

- After we extracted all the data, we got information that ratings and movie metadata can be joined by using the movie id
- Before that, we must profile our table that can help us for the next step
- You can create your Source to Target Mapping or you can see it on this [Spreadsheet](https://docs.google.com/spreadsheets/d/1spFcpnUdoiKW2dApInxDNkadywiBs2mdCt8CmwHZ1ME/edit?usp=sharing)
- The goals of Source to Target Mapping is to identify which columns can be used for the next process like what transformation rules that can be used

### **3. Transform Data**
---

- After we mapping all the columns from Source to Target Mapping, now we can set the transformation rules
- These are the transformation processes that we will do in this case:
    - Join Data
    - Renaming Columns
    - Select Data based on Columns
    - Casting Data Type
    - Filter Data
    - Create New Columns using Existing Columns

#### **3a. Join Data**
---

- In this process, we want to join the data from movies rating and movies metadata
- After we explore the data from the previous process turns out there's one column that we can join!
    - `df_ratings`: `movieId`
    - `df_metadata`: `id`

    <br>
    <center>
        <img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/join_data_case_week_5.png" width=50%>
    </center>
    <br>

- To join the data using PySpark, we can use this code

```python
df1.join(df2, df1.col == df2.col, JOIN_METHOD) # we can use inner, left, right, etc
```

- In this case, we will use `inner` join then save the joined data in `df_joined` variable

In [14]:
df_data = df_ratings.join(df_metadata, df_ratings.movieId == df_metadata.id, "inner")

In [15]:
df_data.show(5)

+------+-------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+-----+------------+----------+
|userId|movieId|rating| timestamp|adult|belongs_to_collection|budget|              genres|homepage|    id|  imdb_id|original_language|original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|video|vote_average|vote_count|
+------+-------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---

In [16]:
df_data.show(3, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 userId                | 429                                                                                                                                                                                                                                     

#### **3b. Rename Columns**
---

- After we joined the data, turns out there's a columns that not in the correct format
- Columns that we want to renamed are:
    - `userId`: `user_id`
    - `movieId`: `movie_id`

In [17]:
COLUMNS_RENAME = {
    "userId": "user_id",
    "movieId": "movie_id"
}

df_data = df_data.withColumnsRenamed(colsMap=COLUMNS_RENAME)
df_data.show(1)

+-------+--------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+-----+------------+----------+
|user_id|movie_id|rating| timestamp|adult|belongs_to_collection|budget|              genres|homepage|    id|  imdb_id|original_language|original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|video|vote_average|vote_count|
+-------+--------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+----------

#### **3c. Select Columns**
---

- This process is straightforward
- There are so many columns that it turns out we don't need all the columns
- This is the list of columns that the Data Scientist needs to build the Recommender System model:
    - `user_id`
    - `movie_id`
    - `rating`
    - `timestamp`
    - `adult`
    - `budget`
    - `genres`
    - `original_language`
    - `original_title`
    - `overview`
    - `popularity`
    - `production_companies`
    - `release_date`
    - `revenue`
    - `runtime`
    - `spoken_languages`
    - `status`
    - `tagline`
    - `title`
    - `vote_average`
    - `vote_count`
 

In [18]:
SELECTED_COLUMNS = [
        "user_id",
        "movie_id",
        "rating",
        "timestamp",
        "adult",
        "budget",
        "genres",
        "original_language",
        "original_title",
        "overview",
        "popularity",
        "production_companies",
        "release_date",
        "revenue",
        "runtime",
        "spoken_languages",
        "status",
        "tagline",
        "title",
        "vote_average",
        "vote_count"
    ]
df_data_filtered = df_data.select(SELECTED_COLUMNS)

In [19]:
df_data_filtered.show(1, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 user_id              | 429                                                                                                                                                                                                                                       

#### **3d. Casting Data Type**
---

In [20]:
df_data_filtered.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- movie_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)



- Turns out, some columns don't have correct data types
- In this process, we will cast the data types
- Columns that we want to cast are:
    - `user_id` as `int`
    - `movie_id` as `int`
    - `budget` as `float`
    - `popularity` as `float`
    - `revenue` as `float`
    - `runtime` as `float`
    - `vote_average` as `float`
    - `vote_count` as `float`
 
- To casting data types we can combine method `withColumn` and `cast`

In [21]:
def casting_dtypes(data,cast_target):
    print(f"Start to cast type data")
    for col, type in cast_target.items():
        data = data.withColumn(col, data[col].cast(type)) 
        print(f"Sukses casting column {col} to {type}")
    print(f"===========END========")
    return data

In [22]:
CAST_DATA = {
    "user_id":"int",
    "movie_id":"int",
    "budget":"float",
    "popularity":"float",
    "revenue":"float",
    "runtime":"float",
    "vote_average":"float",
    "vote_count":"float"
}

df_movies = casting_dtypes(df_data_filtered, CAST_DATA)

Start to cast type data
Sukses casting column user_id to int
Sukses casting column movie_id to int
Sukses casting column budget to float
Sukses casting column popularity to float
Sukses casting column revenue to float
Sukses casting column runtime to float
Sukses casting column vote_average to float
Sukses casting column vote_count to float


In [23]:
df_movies.show(2,truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 user_id              | 429                                                                                                                                                                                                                                       

In [24]:
df_movies.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- budget: float (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: float (nullable = true)
 |-- runtime: float (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: float (nullable = true)



#### **3e. Convert Unix Timestamp to Date Time**
---

- If we see our data in `timestamp` the values are in the UNIX timestamp

**Example of Unix Timestamp**

<center>
    <img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/example_unix.png" width=80%>
    <br>
    <a href="https://dev.to/ghost/level-up-unix-timestamp-5hnm">img source</a>
</center>

-  It's not human readable, so in this case we will convert it to date time so it will be readable
-  To convert to date time, we can use PySpark functions called `from_unixtime` then we can combine it with the `withColumn` method

    ```
    df.withColumn(col_name, from_unixtime(col_name).alias("ts")) # if we want convert it to timestamp
    ```

In [25]:
df_movies.show(1)

+-------+--------+------+----------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|user_id|movie_id|rating| timestamp|adult|budget|              genres|original_language|original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|vote_average|vote_count|
+-------+--------+------+----------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|    429|  100010|   1.0|1475231879|False|   0.0|[{'id': 18, 'name...|               en|Flight Command|A rookie flyer, E...|  0.769266|[{'name': 'Metro-...|  1940-12-27|   

In [26]:
from pyspark.sql.functions import from_unixtime

In [27]:
df_movies = df_movies.withColumn("timestamp",from_unixtime("timestamp").alias("ts"))

In [28]:
df_movies.show(1,truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 user_id              | 429                                                                                                                                                                                                                                       

#### **3f. Filter Data**
---

In [29]:
# before filter

df_movies.count()

11437637

- As we can see, our data is pretty big at 11 million rows
- Because there's a constraint budget for building the Recommender System model, we must not use all the data
- So, we must filter the data. The requirements filter are the `release_date` for movies is `2010-01-01` and user `timestamp` when giving a rating is `2017-01-01 00:00:00`
- To do filter in this case, we will use SQL based on PySpark

In [30]:
df_movies = df_movies.filter("release_date >='2010-01-01' and timestamp >='2017-01-01 00:00:00'")
df_movies.count()

29681

#### **3g. Create New Columns using Existing Columns**
---

- To enrich our data and improve Recommended System model, we can generate new column based on existing column
- In this case, we will create a new column called `profit` by subtracting column `revenue` with `budget`

    $$
    \text{profit} = \text{revenue} - \text{budget}
    $$

#### **3g. Create New Columns using Existing Columns**
---

In [31]:
df_movies = df_movies.withColumn("profit", df_movies["revenue"] - df_movies["budget"])

In [32]:
df_movies.show(2)

+-------+--------+------+-------------------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+-------+------------------+------------+----------+------+
|user_id|movie_id|rating|          timestamp|adult|budget|              genres|original_language|original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|tagline|             title|vote_average|vote_count|profit|
+-------+--------+------+-------------------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+-------+------------------+------------+----------+------+
|  22378|  100270|   5.0|2017-03-15 13:42:57|False|   0.0|[{'id': 35, 'name...|               es| Lobos de Arga|After 15 years, T...|  2.697544|       

Validate the result by filter the `profit` is more than `0`

In [34]:
df_movies.filter(df_movies['profit'] > 0).show(2, vertical=True)

-RECORD 0------------------------------------
 user_id              | 1846                 
 movie_id             | 32657                
 rating               | 4.5                  
 timestamp            | 2017-01-17 20:21:07  
 adult                | False                
 budget               | 9.5E7                
 genres               | [{'id': 12, 'name... 
 original_language    | en                   
 original_title       | Percy Jackson & t... 
 overview             | Accident prone te... 
 popularity           | 9.785895             
 production_companies | [{'name': 'Ingeni... 
 release_date         | 2010-02-01           
 revenue              | 2.26497216E8         
 runtime              | 118.0                
 spoken_languages     | [{'iso_639_1': 'e... 
 status               | Released             
 tagline              | Worlds Collide       
 title                | Percy Jackson & t... 
 vote_average         | 6.0                  
 vote_count           | 2079.0    

### **4. Load Data**
---

- After we extract, profiling, and transform our data based on the requirements the last step is to load the data into one place
- In this case, we will dump our data in the Postgres database in `obt_rating_movies` table
- For configuration of the database is the same as in the extract process

In [None]:
# Access variables
db_host = os.getenv("POSTGRES_URL_MOVIE")
db_name = os.getenv("POSTGRES_DB_MOVIE")
db_user = os.getenv("POSTGRES_USER_MOVIE")
db_pass = os.getenv("POSTGRES_PASSWORD_MOVIE")
conn_properties = {
    "user": db_user,
    "password": db_pass,
    "driver": "org.postgresql.Driver" # set driver postgres
}

In [None]:
df_joined.write.jdbc(url = db_host,
                  table = "obt_rating_movies",
                  mode = "overwrite",
                  properties = conn_properties