In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=4a8a81e86185615b559cef488faf5143f5e76889daf50b7f0649d371d52d8afc
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("PandasToPySpark").getOrCreate()

# Read the CSV without specifying the schema
df = spark.read.csv('./raw_data/Sales.SalesOrderHeader.csv', sep=';', header=True)  # Assuming CSV has headers

# Replace "NULL" (case insensitive) with empty string
for column in df.columns:
    # Store the original data type
    original_type = df.schema[column].dataType

    # Replace "null" cases and then cast back to original type
    df = df.withColumn(column, regexp_replace(col(column), "(?i)NULL", "").cast(original_type))

# Convert specific columns to the right data type and drop NULL values
df = df.withColumn("SalesPersonID", col("SalesPersonID").cast(IntegerType())).filter(col("SalesPersonID").isNotNull())
df = df.withColumn("CreditCardID", col("CreditCardID").cast(IntegerType())).filter(col("CreditCardID").isNotNull())

# Replace commas with dots for specified columns
cols_to_update = ["SubTotal", "TaxAmt", "Freight", "TotalDue"]
for col_name in cols_to_update:
    df = df.withColumn(col_name, regexp_replace(col(col_name), ",", ".").cast(FloatType()))

# Now, apply the strict schema
df = df.select(
    col("SalesOrderID").cast(IntegerType()).alias("SalesOrderID"),
    col("RevisionNumber").cast(IntegerType()).alias("RevisionNumber"),
    col("OrderDate").cast(TimestampType()).alias("OrderDate"),
    col("DueDate").cast(TimestampType()).alias("DueDate"),
    col("ShipDate").cast(TimestampType()).alias("ShipDate"),
    col("Status").cast(IntegerType()).alias("Status"),
    col("OnlineOrderFlag").cast(IntegerType()).alias("OnlineOrderFlag"),
    col("SalesOrderNumber").alias("SalesOrderNumber"),
    col("PurchaseOrderNumber").alias("PurchaseOrderNumber"),
    col("AccountNumber").alias("AccountNumber"),
    col("CustomerID").cast(IntegerType()).alias("CustomerID"),
    col("SalesPersonID").cast(IntegerType()).alias("SalesPersonID"),
    col("TerritoryID").cast(IntegerType()).alias("TerritoryID"),
    col("BillToAddressID").cast(IntegerType()).alias("BillToAddressID"),
    col("ShipToAddressID").cast(IntegerType()).alias("ShipToAddressID"),
    col("ShipMethodID").cast(IntegerType()).alias("ShipMethodID"),
    col("CreditCardID").cast(IntegerType()).alias("CreditCardID"),
    col("CreditCardApprovalCode").alias("CreditCardApprovalCode"),
    col("CurrencyRateID").cast(IntegerType()).alias("CurrencyRateID"),
    col("SubTotal").cast(FloatType()).alias("SubTotal"),
    col("TaxAmt").cast(FloatType()).alias("TaxAmt"),
    col("Freight").cast(FloatType()).alias("Freight"),
    col("TotalDue").cast(FloatType()).alias("TotalDue"),
    col("Comment").alias("Comment"),
    col("rowguid").alias("rowguid"),
    col("ModifiedDate").cast(TimestampType()).alias("ModifiedDate")
)

df.show()
#df.printSchema()

+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+-------------+-----------+---------------+---------------+------------+------------+----------------------+--------------+---------+---------+---------+---------+-------+--------------------+-------------------+
|SalesOrderID|RevisionNumber|          OrderDate|            DueDate|           ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|SalesPersonID|TerritoryID|BillToAddressID|ShipToAddressID|ShipMethodID|CreditCardID|CreditCardApprovalCode|CurrencyRateID| SubTotal|   TaxAmt|  Freight| TotalDue|Comment|             rowguid|       ModifiedDate|
+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+-------------+-----------+---------------+---------

In [7]:
# Writing the transformed dataframe to a new CSV file
df.coalesce(1).write.csv('table_salesorderheader.csv', header=True, mode='overwrite')

In [9]:
spark.stop()