<a href="https://colab.research.google.com/github/sohakhalid/SohaKhalid-Data-Engineering-BWF/blob/main/Pyspark_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark
# Import PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, hour, col
from pyspark.sql.types import TimestampType

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=75159fbf1343b7cc09ff75bef820cd5f2a1326d919fa7ddadd473e999aaa2b26
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [3]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("BWF-PySparkTask") \
    .getOrCreate()

In [11]:
# Path to your CSV file
file_path = '/data.csv'

In [12]:
# Read CSV into DataFrame
orders_data = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the DataFrame with reordered columns
orders_data.show()

+-------------------+--------+--------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|         order_date|order_id|             product|   product_id|    category| purchase_address|quantity_ordered|  price_each|cost_price|turnover|  margin|
+-------------------+--------+--------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|2023-01-22 21:25:00|  141234|              iPhone|5638008983335|   Vêtements| """944 Walnut St|          Boston| MA 02215"""|         1|   700.0|   231.0|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5563319511488|Alimentation|  """185 Maple St|        Portland| OR 97035"""|         1|   14.95|   7.475|
|2023-01-17 13:33:00|  141236|    Wired Headphones|2113973395220|   Vêtements|  """538 Adams St|   San Francisco| CA 94016"""|         2|   11.99|   5.995|
|2023-01-05 20:33:00|  141237|    27in FHD Monitor|3069156759167

In [13]:
# Add time_of_day column based on the hour
orders_data = orders_data.withColumn(
    "time_of_day",
    when((hour(col("order_date")) >= 5) & (hour(col("order_date")) < 12), "morning")
    .when((hour(col("order_date")) >= 12) & (hour(col("order_date")) < 18), "afternoon")
    .when((hour(col("order_date")) >= 18) & (hour(col("order_date")) < 24), "evening")
    .otherwise("night")
)

# Reorder columns to place 'time_of_day' at index 1
columns = orders_data.columns
desired_index = 1
# Remove 'time_of_day' from columns list to avoid duplication
columns.remove('time_of_day')
# Insert 'time_of_day' at the desired index
new_columns = columns[:desired_index] + ['time_of_day'] + columns[desired_index:]

# Reorder the DataFrame columns
orders_data = orders_data.select(new_columns)

# Display the DataFrame with reordered columns
orders_data.show(truncate=False)

+-------------------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|order_date         |time_of_day|order_id|product                   |product_id   |category    |purchase_address |quantity_ordered|price_each  |cost_price|turnover|margin  |
+-------------------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|2023-01-22 21:25:00|evening    |141234  |iPhone                    |5638008983335|Vêtements   |"""944 Walnut St | Boston         | MA 02215"""|1         |700.0   |231.0   |
|2023-01-28 14:15:00|afternoon  |141235  |Lightning Charging Cable  |5563319511488|Alimentation|"""185 Maple St  | Portland       | OR 97035"""|1         |14.95   |7.475   |
|2023-01-17 13:33:00|afternoon  |141236  |Wired Headphones          |2113973395220|Vêtements   |"""538 Adams St  | San Francisco  

In [14]:
from pyspark.sql.functions import hour, col

# Extract the hour from order_date and filter out rows where the hour is between 0 and 5 inclusive
orders_data = orders_data.filter((hour(col("order_date")) < 0) | (hour(col("order_date")) > 5))

orders_data.show(truncate=False)

+-------------------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|order_date         |time_of_day|order_id|product                   |product_id   |category    |purchase_address |quantity_ordered|price_each  |cost_price|turnover|margin  |
+-------------------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|2023-01-22 21:25:00|evening    |141234  |iPhone                    |5638008983335|Vêtements   |"""944 Walnut St | Boston         | MA 02215"""|1         |700.0   |231.0   |
|2023-01-28 14:15:00|afternoon  |141235  |Lightning Charging Cable  |5563319511488|Alimentation|"""185 Maple St  | Portland       | OR 97035"""|1         |14.95   |7.475   |
|2023-01-17 13:33:00|afternoon  |141236  |Wired Headphones          |2113973395220|Vêtements   |"""538 Adams St  | San Francisco  

In [15]:
from pyspark.sql.functions import to_date

# Convert order_date column from timestamp to date
orders_data = orders_data.withColumn("order_date", to_date(col("order_date")))

orders_data.show(truncate=False)

+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|order_date|time_of_day|order_id|product                   |product_id   |category    |purchase_address |quantity_ordered|price_each  |cost_price|turnover|margin  |
+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|2023-01-22|evening    |141234  |iPhone                    |5638008983335|Vêtements   |"""944 Walnut St | Boston         | MA 02215"""|1         |700.0   |231.0   |
|2023-01-28|afternoon  |141235  |Lightning Charging Cable  |5563319511488|Alimentation|"""185 Maple St  | Portland       | OR 97035"""|1         |14.95   |7.475   |
|2023-01-17|afternoon  |141236  |Wired Headphones          |2113973395220|Vêtements   |"""538 Adams St  | San Francisco  | CA 94016"""|2         |11.99   |5.995   |
|2023-01-0

In [16]:
# Remove rows where the product column has the value "TV"
orders_data = orders_data.filter(col("product") != "TV")

In [17]:
# Show the final DataFrame
orders_data.show(truncate=False)

+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|order_date|time_of_day|order_id|product                   |product_id   |category    |purchase_address |quantity_ordered|price_each  |cost_price|turnover|margin  |
+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|2023-01-22|evening    |141234  |iPhone                    |5638008983335|Vêtements   |"""944 Walnut St | Boston         | MA 02215"""|1         |700.0   |231.0   |
|2023-01-28|afternoon  |141235  |Lightning Charging Cable  |5563319511488|Alimentation|"""185 Maple St  | Portland       | OR 97035"""|1         |14.95   |7.475   |
|2023-01-17|afternoon  |141236  |Wired Headphones          |2113973395220|Vêtements   |"""538 Adams St  | San Francisco  | CA 94016"""|2         |11.99   |5.995   |
|2023-01-0

In [18]:
from pyspark.sql.functions import lower

# Ensure all values in the purchase_state column are lowercase
orders_data = orders_data.withColumn("category", lower(col("category")))

In [19]:
# Show the final DataFrame
orders_data.show(truncate=False)

+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|order_date|time_of_day|order_id|product                   |product_id   |category    |purchase_address |quantity_ordered|price_each  |cost_price|turnover|margin  |
+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+
|2023-01-22|evening    |141234  |iPhone                    |5638008983335|vêtements   |"""944 Walnut St | Boston         | MA 02215"""|1         |700.0   |231.0   |
|2023-01-28|afternoon  |141235  |Lightning Charging Cable  |5563319511488|alimentation|"""185 Maple St  | Portland       | OR 97035"""|1         |14.95   |7.475   |
|2023-01-17|afternoon  |141236  |Wired Headphones          |2113973395220|vêtements   |"""538 Adams St  | San Francisco  | CA 94016"""|2         |11.99   |5.995   |
|2023-01-0

In [20]:
from pyspark.sql.functions import substring

# State is the 7th and 8th last characters
orders_data = orders_data.withColumn(
    "state",
    substring("purchase_address", -8, 2)
)

In [21]:
# Show the final DataFrame
orders_data.show(truncate=False)

+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+-----+
|order_date|time_of_day|order_id|product                   |product_id   |category    |purchase_address |quantity_ordered|price_each  |cost_price|turnover|margin  |state|
+----------+-----------+--------+--------------------------+-------------+------------+-----------------+----------------+------------+----------+--------+--------+-----+
|2023-01-22|evening    |141234  |iPhone                    |5638008983335|vêtements   |"""944 Walnut St | Boston         | MA 02215"""|1         |700.0   |231.0   |al   |
|2023-01-28|afternoon  |141235  |Lightning Charging Cable  |5563319511488|alimentation|"""185 Maple St  | Portland       | OR 97035"""|1         |14.95   |7.475   |Ma   |
|2023-01-17|afternoon  |141236  |Wired Headphones          |2113973395220|vêtements   |"""538 Adams St  | San Francisco  | CA 94016"""|2         

In [23]:
# Path to save the Parquet file
parquet_path = '/orders_data.parquet'

# Save DataFrame as Parquet
orders_data.write.mode('overwrite').parquet(parquet_path)
