## Connecting to ADLS

In [None]:
# display(dbutils.fs.ls("abfss://processed@ADLS_name.dfs.core.windows.net"))

### Exploring Data
##### Displaying ts_registration dataset

In [None]:
display(spark.read.csv("abfss://raw@ADLS_name.dfs.core.windows.net/ts_registration.csv",header=True))

### 1) Creating DataFrame

In [None]:
reg_df = spark.read.csv("abfss://raw@ADLS_name.dfs.core.windows.net/ts_registration.csv",header=True,inferSchema=True,escape='"')

In [None]:
display(reg_df)

In [None]:
reg_df.printSchema()

In [None]:
from pyspark.sql.functions import count,when,isnull,col,desc,trim,coalesce

#### 2) Checking Nulls

In [None]:
reg_df.select([count(when(isnull(c),c)).alias(c) for c in reg_df.columns]).show(vertical=True)

### 3) Temp view
##### Creating Temp View to write SQL quries

In [None]:
reg_df.createOrReplaceTempView("ts")

In [None]:
spark.sql("SELECT * FROM ts").show(5)

In [None]:
display(spark.sql("SELECT OfficeCd,count(officeCd) as cnt FROM ts GROUP BY officeCd ORDER BY cnt DESC"))

### 5) Removing Duplicates
##### Removing Duplicates of last registrationNo

In [None]:
te_df = reg_df.orderBy(col("registrationNo").desc()).dropDuplicates(["registrationNo"])

In [None]:
te_df.count()

In [None]:
display(te_df.groupBy("fromDate").agg(count('registrationNo')).orderBy("fromDate"))

##### 5.1) Dropping unwanted columns "reserve_no"

In [None]:
te_df = te_df.drop("reserve_no")

##### 5.2) Dropping unwanted columns "slno"

In [None]:
te_df = te_df.drop("slno")

In [None]:
te_df.select([count(when(isnull(c),c)).alias(c) for c in te_df.columns]).show(vertical=True)

#### 5.3) Dropping rows
##### Dropping rows that dosent have "registrationNo"

In [None]:
display(te_df.filter(col("registrationNo").isNull()))

In [None]:
te_df = te_df.na.drop(subset=["registrationNo"])

In [None]:
te_df.select([count(when(isnull(c),c)).alias(c) for c in te_df.columns]).show(vertical=True)

### 6) Checking nulls in fuel

In [None]:
fuel_df = te_df.filter(col("fuel").isNull())

In [None]:
fuel_df.count()

In [None]:
display(fuel_df.groupBy('modelDesc').agg(count('modelDesc')).orderBy(desc("count(modelDesc)")))

###### While searching these modelDesc are trailors of trucks and tractors etc these are additional parts for trucks and tractors  \
###### These can be removed

In [None]:
drop_df = te_df.dropna(subset=["fuel"])

#### Nulls from the dataset has been cleared

In [None]:
drop_df.select([count(when(isnull(c),c)).alias(c) for c in drop_df.columns]).show(vertical=True)

#### 7) Data quality checks

In [None]:
display(drop_df.groupBy('fuel').count().orderBy(desc("count")))

In [None]:
dq_df = drop_df.withColumn("fuel", when(drop_df.fuel == "Petrol", "PETROL").when(drop_df.fuel == "petrol", "PETROL").otherwise(drop_df.fuel))

In [None]:
dq_df = dq_df.withColumn("fuel", when(trim(dq_df.fuel) == "DIESEL", "DIESEL").otherwise(dq_df.fuel))

###### These are additional parts of a trucks and tractors and does not require fuel 
###### Its better to remove this

In [None]:
display(dq_df.filter(dq_df.fuel == 0))

In [None]:
dq_df = dq_df.filter(dq_df.fuel != '0')

In [None]:
display(dq_df.filter(dq_df.fuel == -1))

##### Removing rows

In [None]:
dq_df = dq_df.filter(dq_df.fuel != '-1')

In [None]:
display(dq_df.groupBy('fuel').count().orderBy(desc("count")))

In [None]:
final_df = dq_df

In [None]:
display(final_df)

#### Write df to Parquet format

In [None]:
final_df.coalesce(1).write.parquet("abfss://processed@ADLS_name.dfs.core.windows.net/tsreg")

In [None]:
par_df = spark.read.parquet("abfss://processed@ADLS_name.dfs.core.windows.net/tsreg")

In [None]:
par_df.count()