# Part 3: Data Analytics with PySpark

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import boto3
import json
from io import StringIO

import warnings
warnings.filterwarnings('ignore')

In [27]:
# Initialize Spark
spark = SparkSession.builder \
    .appName("RearcDataAnalytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Configuration
BUCKET_NAME = 'rearc-data-quest-us-east-01'
s3_client = boto3.client('s3')

In [28]:
# Load BLS data
obj = s3_client.get_object(Bucket=BUCKET_NAME, Key='bls-data/pub/time.series/pr/pr.data.0.Current')
csv_content = obj['Body'].read().decode('utf-8')

# Create DataFrame from CSV content
from pyspark.sql import Row
import csv

lines = csv_content.strip().split('\n')
reader = csv.DictReader(StringIO(csv_content), delimiter='\t')
rows = [Row(**{k.strip(): v.strip() if v else None for k, v in row.items()}) for row in reader]

bls_df = spark.createDataFrame(rows)
bls_df = bls_df.withColumn("year", col("year").cast(IntegerType())) \
             .withColumn("value", col("value").cast(DoubleType()))

print(f"BLS data: {bls_df.count()} rows")

print(bls_df.show(5, truncate=False))

BLS data: 37182 rows
+-----------+----+------+-----+--------------+
|series_id  |year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|Q01   |2.6  |null          |
|PRS30006011|1995|Q02   |2.1  |null          |
|PRS30006011|1995|Q03   |0.9  |null          |
|PRS30006011|1995|Q04   |0.1  |null          |
|PRS30006011|1995|Q05   |1.4  |null          |
+-----------+----+------+-----+--------------+
only showing top 5 rows

None


In [18]:
# Load population data
obj = s3_client.get_object(Bucket=BUCKET_NAME, Key='api-data/population_data.json')
json_data = json.loads(obj['Body'].read().decode('utf-8'))

# Extract data array
if 'data' in json_data and 'data' in json_data['data']:
    pop_data = json_data['data']['data']
else:
    pop_data = json_data.get('data', json_data)

pop_df = spark.createDataFrame([Row(**row) for row in pop_data])
pop_df = pop_df.withColumn("Year", col("Year").cast(IntegerType())) \
             .withColumn("Population", col("Population").cast(LongType()))

print(f"Population data: {pop_df.count()} rows")
print(pop_df.orderBy('Year').show(truncate=False))o

Population data: 10 rows
+---------+-------------+----+----------+
|Nation ID|Nation       |Year|Population|
+---------+-------------+----+----------+
|01000US  |United States|2013|316128839 |
|01000US  |United States|2014|318857056 |
|01000US  |United States|2015|321418821 |
|01000US  |United States|2016|323127515 |
|01000US  |United States|2017|325719178 |
|01000US  |United States|2018|327167439 |
|01000US  |United States|2019|328239523 |
|01000US  |United States|2021|331893745 |
|01000US  |United States|2022|333287562 |
|01000US  |United States|2023|334914896 |
+---------+-------------+----+----------+

None


## Analysis 1: Population Statistics (2013-2018)

In [12]:
# Filter and calculate population stats
pop_filtered = pop_df.filter((col("Year") >= 2013) & (col("Year") <= 2018))
stats = pop_filtered.agg(
    mean("Population").alias("mean_pop"),
    stddev("Population").alias("std_pop")
).collect()[0]

print(f"Mean Population: {stats.mean_pop:,.0f}")
print(f"Std Deviation: {stats.std_pop:,.0f}")

Mean Population: 322,069,808
Std Deviation: 4,158,441


## Analysis 2: Best Year per Series

In [13]:
# Filter quarterly data and sum by series/year
quarterly_df = bls_df.filter(col("period").startswith("Q"))

yearly_sums = quarterly_df.groupBy("series_id", "year") \
    .agg(sum("value").alias("total_value"))

# Best year for each series
from pyspark.sql.window import Window
window = Window.partitionBy("series_id").orderBy(desc("total_value"))

best_years = yearly_sums.withColumn("rank", row_number().over(window)) \
    .filter(col("rank") == 1) \
    .select("series_id", "year", col("total_value").alias("value")) \
    .orderBy("series_id")

print(f"Total series analyzed: {best_years.count()}")
best_years.show(5, truncate=False)

Total series analyzed: 282
+-----------+----+------------------+
|series_id  |year|value             |
+-----------+----+------------------+
|PRS30006011|2022|20.5              |
|PRS30006012|2022|17.1              |
|PRS30006013|1998|705.895           |
|PRS30006021|2010|17.7              |
|PRS30006022|2010|12.399999999999999|
+-----------+----+------------------+
only showing top 5 rows



## Analysis 3: Combined Report

In [15]:
# Filter specific series and join with population data
filtered_bls = bls_df.filter(
    (col("series_id") == "PRS30006032") & 
    (col("period") == "Q01")
)

combined = filtered_bls.join(
    pop_df.select(col("year").alias("pop_year"), "Population"),
    filtered_bls.year == col("pop_year"),
    "left"
).select(
    "series_id", "year", "period", "value", "Population"
).orderBy("year")

total_records = combined.count()
with_population = combined.filter(col("Population").isNotNull()).count()

print(f"Total records: {total_records}")
print(f"Records with population: {with_population}")
combined.show(10, truncate=False)

Total records: 31
Records with population: 10
+-----------+----+------+-----+----------+
|series_id  |year|period|value|Population|
+-----------+----+------+-----+----------+
|PRS30006032|1995|Q01   |0.0  |null      |
|PRS30006032|1996|Q01   |-4.2 |null      |
|PRS30006032|1997|Q01   |2.8  |null      |
|PRS30006032|1998|Q01   |0.9  |null      |
|PRS30006032|1999|Q01   |-4.1 |null      |
|PRS30006032|2000|Q01   |0.5  |null      |
|PRS30006032|2001|Q01   |-6.3 |null      |
|PRS30006032|2002|Q01   |-6.6 |null      |
|PRS30006032|2003|Q01   |-5.7 |null      |
|PRS30006032|2004|Q01   |2.0  |null      |
+-----------+----+------+-----+----------+
only showing top 10 rows

