## Problem 09

Weightage: 40

Join trips data with stations and get a denormalized table with both startstationname and endstationname on top of all fields from trips.


## Data Description
All of the citibike trip data is available under **/public/citibike/trips**. It contain multiple folders - one for each month. Here is the schema.

```
root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: string (nullable = true)
 |-- endstationid: string (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
```
All of the citibike station data is available under **/public/citibike/stations**. 
```
root
 |-- stationid: long (nullable = true)
 |-- stationlatitude: string (nullable = true)
 |-- stationlongitude: string (nullable = true)
 |-- stationname: string (nullable = true)
```

## Output Requirements
* Place the result in the HDFS Directory 
```
/user/`whoami`/mock_test_02/problem09/solution
```
* Use Parquet File format with any number of files.
* Here are the column names. Data types should be as below.
```
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: integer (nullable = true)
 |-- endstationid: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationname: string (nullable = true)
 |-- endstationname: string (nullable = true)
```
* There are no requirements for sorting the data.

## Validation

Here are the self validation steps:
* Run the following to check number of files.
```
hdfs dfs -ls /user/`whoami`/mock_test_02/problem09/solution
```
* Run this code to create dataframe by name data.
```
import getpass
username = getpass.getuser()
data = spark.read. \
    parquet(f'/user/{username}/mock_test_02/problem09/solution')
```
* Run `data.printSchema()` to validate the data types of the fields.
```
root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: integer (nullable = true)
 |-- endstationid: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationname: string (nullable = true)
 |-- endstationname: string (nullable = true)
```
* Run `data.count()` to validate number of records. It should be **54462016**
* Run `data.show()` to preview the data. Make sure all the data is showing up as expected.

In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName(f'Problem 09 | {username}'). \
    master('yarn'). \
    getOrCreate()

In [2]:
path1='/public/citibike/trips'
path2='/public/citibike/stations'

In [3]:
df3= spark.read.csv(path1,header="true")

In [4]:
df4=spark.read.json(path2)

In [5]:
joined_df=df3. \
join(df4,df3.startstationid == df4.stationid,"inner")

In [6]:
joined_df. \
select("stationname")

stationname
Avenue C & E 18 St
E 74 St & 1 Ave
Degraw St & Smith St
E 31 St & 3 Ave
Central Park West...
Frederick Douglas...
E 98 St & Park Ave
E 58 St & 3 Ave
Broadway & W 29 St
8 Ave & W 31 St


In [7]:
from pyspark.sql.functions import date_format, count, lit, col, split, concat

load_df = joined_df.select(col("tripduration").cast("int"), col("starttime").cast("timestamp"),
                         col("stoptime").cast("timestamp"), col("startstationid").cast("int"), 
                         col("endstationid").cast("int"), col("bikeid").cast("int"), 
                         "usertype", "birthyear", col("gender").cast("int"),
                         date_format("starttime", "MM").cast("int").alias("month"),
                         split(col("stationname"), "&")[0].alias("startstationname"),
                         split(col("stationname"), "&")[1].alias("endstationname"))

In [8]:
load_df. \
coalesce(10).write.format('parquet'). \
mode('overwrite').save("/user/itv002480/mock_test_02/problem09/solution")

In [9]:
!hdfs dfs -ls /user/`whoami`/mock_test_02/problem09/solution

Found 11 items
-rw-r--r--   3 itv002480 supergroup          0 2022-06-30 03:17 /user/itv002480/mock_test_02/problem09/solution/_SUCCESS
-rw-r--r--   3 itv002480 supergroup  115031008 2022-06-30 03:16 /user/itv002480/mock_test_02/problem09/solution/part-00000-159b18e6-a8c0-44f3-9bc8-996f0560951b-c000.snappy.parquet
-rw-r--r--   3 itv002480 supergroup  238916260 2022-06-30 03:17 /user/itv002480/mock_test_02/problem09/solution/part-00001-159b18e6-a8c0-44f3-9bc8-996f0560951b-c000.snappy.parquet
-rw-r--r--   3 itv002480 supergroup  201240876 2022-06-30 03:17 /user/itv002480/mock_test_02/problem09/solution/part-00002-159b18e6-a8c0-44f3-9bc8-996f0560951b-c000.snappy.parquet
-rw-r--r--   3 itv002480 supergroup   91357572 2022-06-30 03:16 /user/itv002480/mock_test_02/problem09/solution/part-00003-159b18e6-a8c0-44f3-9bc8-996f0560951b-c000.snappy.parquet
-rw-r--r--   3 itv002480 supergroup   94865712 2022-06-30 03:16 /user/itv002480/mock_test_02/problem09/solution/part-00004-159b18e6-a8c0-44f3-9b

In [10]:
import getpass
username = getpass.getuser()
data = spark.read. \
  parquet(f'/user/{username}/mock_test_02/problem09/solution')

In [11]:
data.printSchema()

root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: integer (nullable = true)
 |-- endstationid: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationname: string (nullable = true)
 |-- endstationname: string (nullable = true)



In [12]:
data.count()

54462016

In [13]:
data.show()

+------------+--------------------+--------------------+--------------+------------+------+----------+---------+------+-----+----------------+--------------+
|tripduration|           starttime|            stoptime|startstationid|endstationid|bikeid|  usertype|birthyear|gender|month|startstationname|endstationname|
+------------+--------------------+--------------------+--------------+------------+------+----------+---------+------+-----+----------------+--------------+
|         897|2019-07-01 00:00:...|2019-07-01 00:14:...|           493|         454| 18340|Subscriber|     1966|     1|    7|        W 45 St |     6 Ave (1)|
|         267|2019-07-01 00:00:...|2019-07-01 00:04:...|          3143|        3226| 21458|  Customer|     1996|     1|    7|          5 Ave |       E 78 St|
|        2201|2019-07-01 00:00:...|2019-07-01 00:36:...|           317|        3469| 39874|Subscriber|     1986|     1|    7|         E 6 St |      Avenue B|
|        1660|2019-07-01 00:00:...|2019-07-01 00:27: