In [2]:
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType


In [None]:
#parameters
raw_data = "/Users/shivacharan/retail_sales_analytics/data/raw/sales/"

In [4]:
def get_latest_dated_folder(base_path):
    #Gets latest folder in the raw/sales/
    folders = [
        f for f in os.listdir(base_path)
        if os.path.isdir(os.path.join(base_path, f))
    ]
    
    # Try parsing folder names as dates
    dated_folders = []
    for folder in folders:
        try:
            date = datetime.strptime(folder, "%Y-%m-%d")
            dated_folders.append((date, folder))
        except ValueError:
            continue  # skip non-date folders

    if not dated_folders:
        raise ValueError("No date-formatted folders found")

    latest_folder = max(dated_folders, key=lambda x: x[0])[1]
    return os.path.join(base_path, latest_folder)


def get_latest_file(dir_path):
    #Gets the latest file in a directory.
    try:
        files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]
        if not files:
            return None
        latest_file = max(files, key=os.path.getmtime)
        return latest_file
    except Exception as e:
         print(f"An error occurred: {e}")
         return None


In [6]:
latest_folder = get_latest_dated_folder(raw_data)
latest_file = get_latest_file(latest_folder)


spark = SparkSession.builder\
       .appName("RetailSalesProcessing")\
       .getOrCreate()

if latest_file:
    df = spark.read.csv(latest_file, header=True, inferSchema=True) # Read the latest CSV file into a DataFrame
    df.printSchema()
else:
    print(f"No files found in directory: {latest_folder}")


25/06/24 15:59:26 WARN Utils: Your hostname, Shivas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.54 instead (on interface en0)
25/06/24 15:59:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/24 15:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = tr

In [7]:
#drop columns QTR_ID,Territory
df = df.drop("QTR_ID","TERRITORY")

In [8]:
#create a column with only date
df = df.withColumn("DATE", split(col("ORDERDATE"), " ")[0])

In [9]:
df = df.drop("ORDERDATE")

In [10]:
df.select("PHONE","COUNTRY").show()

+----------------+---------+
|           PHONE|  COUNTRY|
+----------------+---------+
|      2125557818|      USA|
|      26.47.1555|   France|
|+33 1 46 62 7555|   France|
|      6265557265|      USA|
|      6505551386|      USA|
|      6505556809|      USA|
|      20.16.1555|   France|
|   +47 2267 3215|   Norway|
|      6505555787|      USA|
|  (1) 47.55.6555|   France|
|    03 9520 4555|Australia|
|      2125551500|      USA|
|      2015559350|      USA|
|      2035552570|      USA|
|      40.67.8555|   France|
|      6175558555|      USA|
|     90-224 8555|  Finland|
|      07-98 9555|   Norway|
|      2155551555|      USA|
|      2125557818|      USA|
+----------------+---------+
only showing top 20 rows



In [11]:
from pyspark.sql.functions import regexp_replace, col

df_cleaned = df.withColumn(
    "PHONE_CLEAN", regexp_replace(col("PHONE"), r"[^0-9+]", "")
)

In [12]:
df_cleaned.select("PHONE_CLEAN","PHONE","COUNTRY").show()

+------------+----------------+---------+
| PHONE_CLEAN|           PHONE|  COUNTRY|
+------------+----------------+---------+
|  2125557818|      2125557818|      USA|
|    26471555|      26.47.1555|   France|
|+33146627555|+33 1 46 62 7555|   France|
|  6265557265|      6265557265|      USA|
|  6505551386|      6505551386|      USA|
|  6505556809|      6505556809|      USA|
|    20161555|      20.16.1555|   France|
| +4722673215|   +47 2267 3215|   Norway|
|  6505555787|      6505555787|      USA|
|   147556555|  (1) 47.55.6555|   France|
|  0395204555|    03 9520 4555|Australia|
|  2125551500|      2125551500|      USA|
|  2015559350|      2015559350|      USA|
|  2035552570|      2035552570|      USA|
|    40678555|      40.67.8555|   France|
|  6175558555|      6175558555|      USA|
|   902248555|     90-224 8555|  Finland|
|    07989555|      07-98 9555|   Norway|
|  2155551555|      2155551555|      USA|
|  2125557818|      2125557818|      USA|
+------------+----------------+---

In [13]:
# Drop 'phone' column
df = df_cleaned.drop('phone')

# Rename 'phone_clean' to 'phone'
df = df.withColumnRenamed('phone_clean', 'PHONE')

In [23]:
# Create a timestamped folder name
date = datetime.now().strftime("%Y%m%d")
output_path = f"/Users/shivacharan/retail_sales_analytics/data/processed/run_{date}"

#Write the data frame to processed folder
df.coalesce(1).write.mode("overwrite").parquet(output_path)

                                                                                

In [22]:
df.columns


['ORDERNUMBER',
 'QUANTITYORDERED',
 'PRICEEACH',
 'ORDERLINENUMBER',
 'SALES',
 'STATUS',
 'MONTH_ID',
 'YEAR_ID',
 'PRODUCTLINE',
 'MSRP',
 'PRODUCTCODE',
 'CUSTOMERNAME',
 'ADDRESSLINE1',
 'ADDRESSLINE2',
 'CITY',
 'STATE',
 'POSTALCODE',
 'COUNTRY',
 'CONTACTLASTNAME',
 'CONTACTFIRSTNAME',
 'DEALSIZE',
 'DATE',
 'PHONE']