In [0]:
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "c2892b2a-6713-4289-b112-520c765f26c6",
  "fs.azure.account.oauth2.client.secret": 'UXD8Q~G...pO0cvA3lw4fJiFzEdkGSMRE.Q1UciZ',
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/dfaa8ecd-f8e5-47ff-b8b4-8b378c43fdc2/oauth2/token"
}

# Unmount the directory if it is already mounted
dbutils.fs.unmount("/mnt/dbdaproject")

# Mount the directory
dbutils.fs.mount(
  source="abfss://oladatacontainer@dbdaproject.dfs.core.windows.net",
  mount_point="/mnt/dbdaproject",
  extra_configs=configs
)

/mnt/dbdaproject has been unmounted.


True

In [0]:
%fs
ls "/mnt/dbdaproject"

path,name,size,modificationTime
dbfs:/mnt/dbdaproject/_$azuretmpfolder$/,_$azuretmpfolder$/,0,1708085503000
dbfs:/mnt/dbdaproject/cluster_data/,cluster_data/,0,1708171953000
dbfs:/mnt/dbdaproject/preprocessedFinal,preprocessedFinal,413217819,1708078975000
dbfs:/mnt/dbdaproject/rawdata1/,rawdata1/,0,1707983062000
dbfs:/mnt/dbdaproject/transformeddata1/,transformeddata1/,0,1707983076000


In [0]:
spark

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType

In [0]:
#Import raw data/Reading DataSet
df = spark.read.format("csv").option("header","true").load("/mnt/dbdaproject/rawdata1/raw_dataN.csv")

In [0]:
#count total number of rows
df.count()

8381556

In [0]:
#To show top 5 rows in raw data
df.show(5)

+-------------------+------+----------+-----------------+---------+-----------------+
|                 ts|number|  pick_lat|         pick_lng| drop_lat|         drop_lng|
+-------------------+------+----------+-----------------+---------+-----------------+
|2020-03-26 07:07:17| 14626|12.3136215|76.65819499999998|12.287301|76.60228000000002|
|2020-03-26 07:32:27| 85490| 12.943947|        77.560745|12.954014|         77.54377|
|2020-03-26 07:36:44| 05408| 12.899603|          77.5873| 12.93478|         77.56995|
|2020-03-26 07:38:00| 58940| 12.918229|77.60754399999998|12.968971|        77.636375|
|2020-03-26 07:39:29| 05408|  12.89949|77.58726999999998| 12.93478|         77.56995|
+-------------------+------+----------+-----------------+---------+-----------------+
only showing top 5 rows



In [0]:
#To show the Schema of data
df.printSchema()

root
 |-- ts: string (nullable = true)
 |-- number: string (nullable = true)
 |-- pick_lat: string (nullable = true)
 |-- pick_lng: string (nullable = true)
 |-- drop_lat: string (nullable = true)
 |-- drop_lng: string (nullable = true)



In [0]:
# Convert ts column to timestamp
df = df.withColumn("ts", df.ts.cast("timestamp"))

# Convert number column to int
df = df.withColumn("number", df.number.cast("int"))

# Convert remaining columns to respective types 
df = df.withColumn("pick_lat", df.pick_lat.cast("double"))
df = df.withColumn("pick_lng", df.pick_lng.cast("double"))
df = df.withColumn("drop_lat", df.drop_lat.cast("double"))
df = df.withColumn("drop_lng", df.drop_lng.cast("double"))

In [0]:
df.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: integer (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)



In [0]:
#To sort data in ascending order of timestamp
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Add a unique identifier to each row using the monotonically_increasing_id() function
dfP = df.withColumn("row_id", F.monotonically_increasing_id())
# Define a window specification to identify duplicates based on 'ts' and 'number'
window_spec = Window.partitionBy("ts", "number").orderBy("row_id")

In [0]:
dfP.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: integer (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)
 |-- row_id: long (nullable = false)



In [0]:
# A Customer_ID number at a particular timestamp can only have one entry
# Filter the data to keep only duplicate rows based on 'ts' and 'number' column
duplicates_dfP = dfP.withColumn("duplicate_count", F.count("*").over(window_spec)) \
    .filter("duplicate_count > 1") \
    .drop("row_id", "duplicate_count")

In [0]:
# Show the resulting DataFrame with duplicate rows
duplicates_dfP.show()

+-------------------+------+------------------+-----------------+----------+-----------------+
|                 ts|number|          pick_lat|         pick_lng|  drop_lat|         drop_lng|
+-------------------+------+------------------+-----------------+----------+-----------------+
|2020-03-26 10:02:29| 67169|         12.896238|        77.631805|12.9824295|         77.59451|
|2020-03-26 10:02:29| 67169|         12.896238|        77.631805|12.9824295|         77.59451|
|2020-03-26 10:02:29| 67169|         12.896238|        77.631805|12.9824295|         77.59451|
|2020-03-26 12:15:34| 40122|         12.973054|         77.56944| 12.952474|        77.543625|
|2020-03-26 13:30:20| 88059|         12.962181|77.59685999999998| 12.926891|77.60968000000003|
|2020-03-26 17:40:36| 89095|        12.9526825|77.55348000000002| 12.943311|         77.54301|
|2020-03-26 18:39:10| 27108|         12.994112|        77.679565| 12.929583|         77.56739|
|2020-03-26 22:08:21| 68488|         12.937589|   

In [0]:
total_rows = duplicates_dfP.count()
# Print the total countof duplicate rows
print("Total Rows in duplicates_df:", total_rows)

Total Rows in duplicates_df: 66058


In [0]:
# Keep the last occurrence of each combination of 'ts' and 'number'
# Drop duplicates and reset the index
# Removing Duplicate Entries ['ts','number']
dfP = dfP.withColumn("row_num", F.row_number().over(window_spec)) \
    .filter("row_num = 1") \
    .drop("row_id", "row_num")

In [0]:
# Top 5 rows 
dfP.show()

+-------------------+------+---------+-----------------+---------+-----------------+
|                 ts|number| pick_lat|         pick_lng| drop_lat|         drop_lng|
+-------------------+------+---------+-----------------+---------+-----------------+
|2020-03-26 01:23:31| 35021|12.934723|         77.61561|12.916122|         77.61019|
|2020-03-26 01:27:29| 90067|12.930832|77.61249000000002|13.197997|         77.68921|
|2020-03-26 02:33:57| 55464|12.977527|77.57096999999997|12.943489|77.57944499999998|
|2020-03-26 03:19:28| 22388|12.986165|        77.553444|12.946916|         77.56795|
|2020-03-26 06:18:06| 65683| 12.99526|        77.684845|12.977277|        77.683235|
|2020-03-26 06:35:44|  4531|12.997383|         77.58623|12.985202|77.53338000000002|
|2020-03-26 06:37:49|  4531|12.997383|         77.58623|12.985202|77.53338000000002|
|2020-03-26 06:55:25| 86493|12.934031|         77.55274|12.981714|        77.628555|
|2020-03-26 06:57:49| 95953|12.928474|         77.59107|12.935624

In [0]:
# Count the total number of rows 
total_rows_result_df = dfP.count()

# Print the total count
print("Total Rows in result_df:", total_rows_result_df)

Total Rows in result_df: 8315498


In [0]:
dfP.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: integer (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)



In [0]:
#To see the null values in each columns
dfP.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in dfP.columns]).show()

+---+------+--------+--------+--------+--------+
| ts|number|pick_lat|pick_lng|drop_lat|drop_lng|
+---+------+--------+--------+--------+--------+
|  0|   116|       0|       0|       0|       0|
+---+------+--------+--------+--------+--------+



In [0]:
#drop null values rows
dfP = dfP.na.drop()
remaining_rows_count = dfP.count()

In [0]:
#Again check for null values in data
dfP.select([F.sum(F.col(c).isNull().cast("integer")).alias(c) for c in dfP.columns]).show()

+---+------+--------+--------+--------+--------+
| ts|number|pick_lat|pick_lng|drop_lat|drop_lng|
+---+------+--------+--------+--------+--------+
|  0|     0|       0|       0|       0|       0|
+---+------+--------+--------+--------+--------+



In [0]:
#print to see datatype of each column
dfP.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: integer (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)



In [0]:
#Breaking the timestamp to convert it into features
from pyspark.sql.functions import hour, minute, dayofmonth, month, year, dayofweek

# 'ts' is already converted to timestamp
dfP = dfP.withColumn("hour", hour("ts"))
dfP = dfP.withColumn("mins", minute("ts"))
dfP = dfP.withColumn("day", dayofmonth("ts"))
dfP = dfP.withColumn("month", month("ts"))
dfP = dfP.withColumn("year", year("ts"))
dfP = dfP.withColumn("dayofweek", dayofweek("ts"))

In [0]:
dfP.show(5)

+-------------------+------+---------+-----------------+---------+-----------------+----+----+---+-----+----+---------+
|                 ts|number| pick_lat|         pick_lng| drop_lat|         drop_lng|hour|mins|day|month|year|dayofweek|
+-------------------+------+---------+-----------------+---------+-----------------+----+----+---+-----+----+---------+
|2020-03-26 01:23:31| 35021|12.934723|         77.61561|12.916122|         77.61019|   1|  23| 26|    3|2020|        5|
|2020-03-26 01:27:29| 90067|12.930832|77.61249000000002|13.197997|         77.68921|   1|  27| 26|    3|2020|        5|
|2020-03-26 02:33:57| 55464|12.977527|77.57096999999997|12.943489|77.57944499999998|   2|  33| 26|    3|2020|        5|
|2020-03-26 03:19:28| 22388|12.986165|        77.553444|12.946916|         77.56795|   3|  19| 26|    3|2020|        5|
|2020-03-26 06:18:06| 65683| 12.99526|        77.684845|12.977277|        77.683235|   6|  18| 26|    3|2020|        5|
+-------------------+------+---------+--

In [0]:
#count total number of rows
dfP.count()

8315382

In [0]:
dfP.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: integer (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- mins: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)



In [0]:
#export it to data storage and save in csv format
dfP.coalesce(1).write.mode("overwrite").option("header", "true").csv("/mnt/dbdaproject/transformeddata1/preprocessed1")