## Problem 08

Weightage: 25

Get bikes based on top 10 monthly trip duration for each month in 2019 first quarter.

## Data Description
All of the citibike trip data is available under **/public/citibike/trips**. It contain multiple folders - one for each month. Here is the schema.

```
root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: string (nullable = true)
 |-- endstationid: string (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
```

## Output Requirements

* Place the result in the HDFS Directory 
```
/user/`whoami`/mock_test_02/problem08/solution
```
* Use CSV and save the output to exactly one file. Make sure to preserve the header.
* Here are the column names. The delimiter should be tab character.
```
 |-- tripmonth: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- tripduration: integer (nullable = true)
```
* Data should be sorted in ascending order by tripmonth and then in descending order by tripduration.

## Validation

Here are the self validation steps:
* Run the following to check number of files.
```
hdfs dfs -ls /user/`whoami`/mock_test_02/problem08/solution
```
* Run this code to create dataframe by name data.
```
import getpass
username = getpass.getuser()
spark.read.csv(f'/user/{username}/mock_test_02/problem08/solution',
               sep='\t',
               header=True,
               inferSchema=True
              ). \
    printSchema()
```
* Here is the output of the previous code to get the schema details.
```
root
 |-- tripmonth: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- tripduration: integer (nullable = true)
```
* Run the following to validate the data. It should show 21 or more records including header. Validate against the output.
```
hdfs dfs -cat /user/`whoami`/mock_test_02/problem08/solution/part*
```
* Output
```
tripmonth	bikeid	tripduration
201901	34105	2704377
201901	28660	2410501
201901	19486	2387390
201901	33699	2091471
201901	30321	1818328
201901	20934	1738805
201901	34355	1474421
201901	27128	1397734
201901	32139	1172618
201901	34183	1161233
201902	33883	2422646
201902	21481	1988809
201902	27447	1965938
201902	14858	1463295
201902	35731	1458714
201902	28660	1450403
201902	30143	1401552
201902	29019	1384035
201902	35035	1362512
201902	14903	1322733
201903	30462	2982161
201903	28384	2767163
201903	35680	2606391
201903	25718	1943545
201903	29770	1839760
201903	37947	1630514
201903	34184	1592415
201903	25441	1483435
201903	15506	1462460
201903	31275	1397484
```

In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName(f'Problem 08 | {username}'). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', 2)

In [3]:
cityBikePath="/public/citibike/trips/month=2019*/part-*.csv.gz"
cityBikeDf=spark.read.csv(cityBikePath,header=True)

In [4]:
from pyspark.sql.functions import date_format,col,dense_rank,sum
from pyspark.sql.window import Window

In [5]:
cityBikeQuarterDf=cityBikeDf.withColumn("tripmonth",date_format("starttime",'yyyyMM')). \
            filter(col("tripmonth").isin("201901","201902","201903"))

In [6]:
spec=Window.partitionBy("tripmonth"). \
            orderBy(col("tripduration").desc())

In [8]:
durationDf=cityBikeQuarterDf.groupBy("tripmonth","bikeid"). \
                agg(sum(col("tripduration")).alias("tripduration")). \
                orderBy(col('tripmonth'),col('tripduration').desc())

In [22]:
resultsDf=durationDf.withColumn("rnk",dense_rank().over(spec)). \
            filter(col("rnk")<=10). \
            select("tripmonth","bikeid","tripduration"). \
            orderBy(col("tripmonth"),col("tripduration").desc())

In [23]:
resultsDf

tripmonth,bikeid,tripduration
201901,34105,2704377.0
201901,28660,2410501.0
201901,19486,2387390.0
201901,33699,2091471.0
201901,30321,1818328.0
201901,20934,1738805.0
201901,34355,1474421.0
201901,27128,1397734.0
201901,32139,1172618.0
201901,34183,1161233.0


In [24]:
resultsDf.coalesce(1). \
    write.mode("overwrite"). \
    option('header','true'). \
    option("sep","\t"). \
    csv(f'/user/{username}/mock_test_02/problem08/solution')

# Validation

In [25]:
%%sh
hdfs dfs -ls /user/`whoami`/mock_test_02/problem08/solution

Found 2 items
-rw-r--r--   3 itv001477 supergroup          0 2021-12-10 05:42 /user/itv001477/mock_test_02/problem08/solution/_SUCCESS
-rw-r--r--   3 itv001477 supergroup        720 2021-12-10 05:42 /user/itv001477/mock_test_02/problem08/solution/part-00000-00181d7a-a1ed-4d37-820f-fb2b53232a04-c000.csv


In [26]:
data=spark.read.csv(f'/user/{username}/mock_test_02/problem08/solution',
             sep='\t',
             header=True,
             inferSchema=True
            )

In [27]:
data.printSchema()

root
 |-- tripmonth: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- tripduration: double (nullable = true)



In [28]:
%%sh
hdfs dfs -cat /user/`whoami`/mock_test_02/problem08/solution/part*

tripmonth	bikeid	tripduration
201901	34105	2704377.0
201901	28660	2410501.0
201901	19486	2387390.0
201901	33699	2091471.0
201901	30321	1818328.0
201901	20934	1738805.0
201901	34355	1474421.0
201901	27128	1397734.0
201901	32139	1172618.0
201901	34183	1161233.0
201902	33883	2422646.0
201902	21481	1988809.0
201902	27447	1965938.0
201902	14858	1463295.0
201902	35731	1458714.0
201902	28660	1450403.0
201902	30143	1401552.0
201902	29019	1384035.0
201902	35035	1362512.0
201902	14903	1322733.0
201903	30462	2982161.0
201903	28384	2767163.0
201903	35680	2606391.0
201903	25718	1943545.0
201903	29770	1839760.0
201903	37947	1630514.0
201903	34184	1592415.0
201903	25441	1483435.0
201903	15506	1462460.0
201903	31275	1397484.0
