In [1]:
import pandas as pd

In [2]:
base_dir = 'car_sales_data'

In [3]:
import glob

In [4]:
glob.glob(f'{base_dir}/*.csv')

['car_sales_data/sales_2016_01.csv',
 'car_sales_data/sales_2018_10.csv',
 'car_sales_data/sales_2018_04.csv',
 'car_sales_data/sales_2022_11.csv',
 'car_sales_data/sales_2022_05.csv',
 'car_sales_data/sales_2022_04.csv',
 'car_sales_data/sales_2022_10.csv',
 'car_sales_data/sales_2020_01.csv',
 'car_sales_data/sales_2018_05.csv',
 'car_sales_data/sales_2018_11.csv',
 'car_sales_data/sales_2016_02.csv',
 'car_sales_data/sales_2018_07.csv',
 'car_sales_data/sales_2020_03.csv',
 'car_sales_data/sales_2022_06.csv',
 'car_sales_data/sales_2022_12.csv',
 'car_sales_data/sales_2022_07.csv',
 'car_sales_data/sales_2020_02.csv',
 'car_sales_data/sales_2018_12.csv',
 'car_sales_data/sales_2018_06.csv',
 'car_sales_data/sales_2016_03.csv',
 'car_sales_data/sales_2018_02.csv',
 'car_sales_data/sales_2016_07.csv',
 'car_sales_data/sales_2020_06.csv',
 'car_sales_data/sales_2020_12.csv',
 'car_sales_data/sales_2022_03.csv',
 'car_sales_data/sales_2022_02.csv',
 'car_sales_data/sales_2020_07.csv',
 

In [6]:
total_records = 0

for file in glob.glob(f'{base_dir}/*.csv'):
    car_sales = pd.read_csv(file)
    total_records += car_sales.shape[0]

total_records

12899990

In [7]:
# Create a data frame to store
# sales count by state

sales_count_by_state_interim = pd.DataFrame(columns=['state', 'sale_count'])

for file in glob.glob(f'{base_dir}/*.csv'):
    car_sales = pd.read_csv(file)
    sale_count_by_state = car_sales.groupby('state').size().reset_index(name='sale_count')
    sales_count_by_state_interim = pd.concat([sales_count_by_state_interim, sale_count_by_state])


In [8]:
sales_count_by_state_interim

Unnamed: 0,state,sale_count
0,CA,7546
1,FL,7512
2,GA,7542
3,IL,7458
4,MI,7531
...,...,...
5,NC,9353
6,NY,9521
7,OH,9527
8,PA,9490


In [9]:
sales_count_by_state = sales_count_by_state_interim. \
    groupby('state'). \
    agg(sale_count=('sale_count', 'sum'))

In [10]:
sales_count_by_state. \
    reset_index(). \
    sort_values('sale_count', ascending=False)

Unnamed: 0,state,sale_count
9,TX,1290867
5,NC,1290735
7,OH,1290729
6,NY,1290693
4,MI,1290309
8,PA,1289814
3,IL,1289672
0,CA,1289455
1,FL,1289117
2,GA,1288599


In [11]:
import polars as pl

In [12]:
# Read all the files
car_sales = pl.read_csv(f'{base_dir}/*.csv')

In [13]:
car_sales.shape

(12899990, 15)

In [16]:
# Get sales by state using polars
sales_by_state = car_sales.group_by('state').agg(
    pl.sum('sale_price').alias('total_revenue'),
    pl.count('sale_price').alias('sales_count')
)

In [19]:
sales_by_state.shape

(10, 3)

In [20]:
pl.Config.set_fmt_float("full")

polars.config.Config

In [21]:
sales_by_state.head()

state,total_revenue,sales_count
str,f64,u32
"""MI""",48389009099.38,1290309
"""PA""",48412069561.59,1289814
"""GA""",48347024626.96,1288599
"""CA""",48394059754.01,1289455
"""OH""",48412988663.34,1290729


In [22]:
sales_by_state.sort('sales_count', descending=True)

state,total_revenue,sales_count
str,f64,u32
"""TX""",48429721750.82,1290867
"""NC""",48410710436.63,1290735
"""OH""",48412988663.34,1290729
"""NY""",48417155335.06,1290693
"""MI""",48389009099.38,1290309
"""PA""",48412069561.59,1289814
"""IL""",48399698308.21,1289672
"""CA""",48394059754.01,1289455
"""FL""",48361954451.31,1289117
"""GA""",48347024626.96,1288599


In [23]:
from pyspark.sql import SparkSession

In [24]:
session = SparkSession. \
    builder. \
    appName("Car Sales"). \
    getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/05 09:56:01 WARN Utils: Your hostname, ITVersitys-Mac-Studio.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.5 instead (on interface en1)
26/01/05 09:56:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/05 09:56:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/05 09:56:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [25]:
car_sales = session. \
    read. \
    csv(base_dir, 
        header=True, 
        inferSchema=True
    )

                                                                                

In [26]:
car_sales.count()

                                                                                

12899990

In [28]:
from pyspark.sql.functions import count, sum, col

In [29]:
sales_by_state = car_sales.groupBy('state').agg(
    sum('sale_price').alias('total_revenue'),
    count('sale_price').alias('sales_count')
)

In [31]:
sales_by_state. \
    orderBy(col('sales_count').desc()). \
    toPandas()

                                                                                

Unnamed: 0,state,total_revenue,sales_count
0,TX,48429720000.0,1290867
1,NC,48410710000.0,1290735
2,OH,48412990000.0,1290729
3,NY,48417160000.0,1290693
4,MI,48389010000.0,1290309
5,PA,48412070000.0,1289814
6,IL,48399700000.0,1289672
7,CA,48394060000.0,1289455
8,FL,48361950000.0,1289117
9,GA,48347020000.0,1288599
