# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::565258093567:role/s3-glue-role
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 147c5320-447d-4679-9fcb-7f4bca01adf0
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 147c5320-447d-4679

#### Reading data in dynamic df and coverting into spark df


In [2]:
base_df = glueContext.create_dynamic_frame.from_catalog(database='de-automotive-db', table_name='de_automotive_raw_ap_south_1')
base_df = base_df.toDF()




#### Selecting required columns and filtering data


In [3]:
used_veh_df = base_df.select('vin', 'firstseen', 'msrp', 'lastseen', 'askprice', 'mileage', 'isnew', 'brandname', 'dealerid', 'vf_destinationmarket', 'vf_make', 'vf_model', 'vf_modelyear')\
                     .filter(base_df.isnew == 'FALSE')
used_veh_df.show(5)

+--------------------+----------+-----+----------+--------+-------+-----+----------+--------+--------------------+----------+--------------+------------+
|                 vin| firstseen| msrp|  lastseen|askprice|mileage|isnew| brandname|dealerid|vf_destinationmarket|   vf_make|      vf_model|vf_modelyear|
+--------------------+----------+-----+----------+--------+-------+-----+----------+--------+--------------------+----------+--------------+------------+
|abc5f0360059cf7b6...|2019-05-06| 1498|2019-05-06|    1498|      0|false|MITSUBISHI|    7514|                    |MITSUBISHI|Eclipse Spyder|        2002|
|e24402cc77f6fd2d6...|2019-05-06|10589|2019-05-06|   10589|      0|false|    NISSAN|    7514|                    |    NISSAN|        Altima|        2016|
|1c5a8dc966b3d3b37...|2017-06-03|11992|2019-05-07|    9940|      0|false|      FORD|    7514|                    |      FORD|        Escape|        2014|
|edce23814c88f5a1d...|2019-05-06|12387|2019-05-07|   12387|      0|false| CH

In [4]:
from pyspark.sql.functions import when
used_veh_df = used_veh_df.withColumn("mileage_bucket",
     when((used_veh_df.mileage >= 0) & (used_veh_df.mileage < 20000), "Btw 0-20k")
    .when((used_veh_df.mileage >= 20000) & (used_veh_df.mileage < 40000), "Btw 20-40k")
    .when((used_veh_df.mileage >= 40000) & (used_veh_df.mileage < 60000), "Btw 40-60k")
    .when((used_veh_df.mileage >= 60000) & (used_veh_df.mileage < 80000), "Btw 60-80k")
    .otherwise(">80K"))
used_veh_df.show(10)                                  

+--------------------+----------+-----+----------+--------+-------+-----+----------+--------+--------------------+----------+--------------+------------+--------------+
|                 vin| firstseen| msrp|  lastseen|askprice|mileage|isnew| brandname|dealerid|vf_destinationmarket|   vf_make|      vf_model|vf_modelyear|mileage_bucket|
+--------------------+----------+-----+----------+--------+-------+-----+----------+--------+--------------------+----------+--------------+------------+--------------+
|abc5f0360059cf7b6...|2019-05-06| 1498|2019-05-06|    1498|      0|false|MITSUBISHI|    7514|                    |MITSUBISHI|Eclipse Spyder|        2002|     Btw 0-20k|
|e24402cc77f6fd2d6...|2019-05-06|10589|2019-05-06|   10589|      0|false|    NISSAN|    7514|                    |    NISSAN|        Altima|        2016|     Btw 0-20k|
|1c5a8dc966b3d3b37...|2017-06-03|11992|2019-05-07|    9940|      0|false|      FORD|    7514|                    |      FORD|        Escape|        2014|  

#### Backfilling and Frontfilling askprice
##### Populating price from the price seen on next date or price seen on the previos date for the same vin

In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,col,lead,lag,coalesce

price_cnd = Window.partitionBy("vin").orderBy("lastseen")
used_veh_df = used_veh_df. \
                withColumn('bf_askprice', lead('askprice').over(price_cnd)). \
                withColumn('ff_askprice', lag('askprice').over(price_cnd))        
used_veh_df = used_veh_df.select('vin', \
                                 'firstseen', \
                                'msrp', \
                                 coalesce('askprice','bf_askprice','ff_askprice').alias('askprice'), \
                                 'lastseen', \
                                 'mileage', \
                                 'isnew', \
                                 'brandname', \
                                 'dealerid', \
                                 'vf_destinationmarket', \
                                 'vf_make', \
                                 'vf_model', \
                                 'vf_modelyear', \
                                 'mileage_bucket')

used_veh_df.show(10)        

+--------------------+----------+-----+--------+----------+-------+-----+---------+--------+--------------------+--------+--------+------------+--------------+
|                 vin| firstseen| msrp|askprice|  lastseen|mileage|isnew|brandname|dealerid|vf_destinationmarket| vf_make|vf_model|vf_modelyear|mileage_bucket|
+--------------------+----------+-----+--------+----------+-------+-----+---------+--------+--------------------+--------+--------+------------+--------------+
|000491cd8e360805f...|2019-05-27| 3948|    3463|2019-06-13| 157458|false|   NISSAN|    5467|                    |  NISSAN|   Quest|        2006|          >80K|
|0004d5cfd9641a932...|2018-12-23|28000|   28000|2019-01-25|  46547|false|     FORD|   27014|                    |    FORD|   F-150|        2015|    Btw 40-60k|
|0004d5cfd9641a932...|2018-12-23|28000|   28000|2019-01-26|  46547|false|     FORD|   22260|                    |    FORD|   F-150|        2015|    Btw 40-60k|
|0004d5cfd9641a932...|2018-12-24|28000| 

#### For each vin, picking last seen record as sales record

In [12]:
windowSpec  = Window.partitionBy("vin").orderBy(col("lastSeen").desc(),col("askprice").desc())

sold_veh_df = used_veh_df.withColumn("rn",row_number().over(windowSpec))
sold_veh_df = sold_veh_df.filter(sold_veh_df.rn == 1).drop('rn')
sold_veh_df.show(10)

+--------------------+----------+-----+--------+----------+-------+-----+---------+--------+--------------------+---------+----------+------------+--------------+
|                 vin| firstseen| msrp|askprice|  lastseen|mileage|isnew|brandname|dealerid|vf_destinationmarket|  vf_make|  vf_model|vf_modelyear|mileage_bucket|
+--------------------+----------+-----+--------+----------+-------+-----+---------+--------+--------------------+---------+----------+------------+--------------+
|000491cd8e360805f...|2019-05-27| 3948|    3463|2019-06-13| 157458|false|   NISSAN|    5467|                    |   NISSAN|     Quest|        2006|          >80K|
|0004d5cfd9641a932...|2018-12-23|28000|   28000|2019-01-26|  46547|false|     FORD|   22260|                    |     FORD|     F-150|        2015|    Btw 40-60k|
|0006bbacb21021d05...|2019-12-29|13995|   10936|2020-04-25|      0|false| CHRYSLER|   26384|                    | CHRYSLER|       200|        2017|     Btw 0-20k|
|000896fbfc5b684c9...|

#### Load data into processed S3 bucket 


In [14]:
try:
    output_path = "s3a://de-automotive-cleaned-ap-south-1/transformed_data"
    # Write the Spark DataFrame to S3 in Parquet format
    sold_veh_df.write.mode("overwrite").parquet(output_path)
    # Stop the Spark session
    spark.stop()
    print('Transformed data loaded successfully in S3 Processed Bucket')
except Exception as e:
    print(e)

Transformed data loaded successfully in S3 Processed Bucket
