# PySpark—Expanded Hands-on Notebook (Runnable + Fallback)
This notebook attempts to create a local SparkSession and run small examples. If Spark/Java is not available
in your environment, the notebook will gracefully fall back to using pandas for the same examples so you can
develop logic locally and then run on a cluster.

Files: large synthetic dataset (~200k rows) and a customer lookup CSV are included in `/samples/`.

## 0. Quick setup (install pyspark if needed)
**What this cell does:** attempts to import pyspark; if missing, shows how to install it. Installation in some envs requires Java and may fail.
**Notes:** If you have a cluster or local Spark installed, prefer to run the notebook with that Spark runtime.
**Keywords:** pyspark, SparkSession, local[*]

In [None]:
# Try importing pyspark; if not installed, instruct how to install.
import importlib, sys
pyspark_spec = importlib.util.find_spec('pyspark')
if pyspark_spec is None:
    print('pyspark not found in this environment. You can install with: pip install pyspark') 
    print('If you plan to run Spark locally, ensure Java (JDK 8/11) is installed and JAVA_HOME is set.')
else:
    import pyspark
    print('pyspark available:', pyspark.__version__)

## 1. Create SparkSession (local mode) with safe fallback
**What this cell does:** tries to create a SparkSession using local[*]. If it fails (no Java), falls back to a `None` spark variable and you can continue with pandas.
**Notes on internals:** local[*] uses all CPU cores. SparkSession requires JVM; creating it will fail without Java.

In [None]:
# Create SparkSession if possible; otherwise set spark = None and continue with pandas fallback
spark = None
try:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master('local[*]').appName('pyspark-notebook-local').getOrCreate()
    print('SparkSession created:', spark)
except Exception as e:
    print('Could not create SparkSession:', e)
    spark = None

# Show available spark variable (None if not created)
spark

## 2. Read the large CSV (Spark if available; otherwise pandas)
**What this cell does:** demonstrates reading the large CSV into Spark DataFrame (or pandas) and shows basic schema and counts.
**Keywords:** spark.read.csv, inferSchema, header, toPandas()

In [None]:
import os
csv_path = r'/mnt/data/pyspark_expanded/samples/sales_200k.csv'
lookup_path = r'/mnt/data/pyspark_expanded/samples/customers_lookup.csv'
print('files exist?', os.path.exists(csv_path), os.path.exists(lookup_path))

if spark is not None:
    sdf = spark.read.option('header', True).option('inferSchema', True).csv(csv_path)
    print('Spark schema:'); sdf.printSchema()
    print('count (Spark):', sdf.count())
    display(sdf.limit(5).toPandas())
else:
    import pandas as pd
    pdf = pd.read_csv(csv_path, parse_dates=['event_ts'])
    print('pandas df shape:', pdf.shape)
    display(pdf.head())

## 3. Simple Transformations: filter, select, new column
**What:** filter paid orders, compute VAT (10%), select columns. Demonstrates difference between lazy (Spark) and eager (pandas).
**Notes:** In Spark, transformations are lazy until action (show/count/write) triggers execution.

In [None]:
import time
t0 = time.time()
if spark is not None:
    from pyspark.sql import functions as F
    sdf2 = sdf.filter(F.col('is_paid') == True).withColumn('vat', F.col('amount') * 0.10).select('order_id','customer_id','amount','vat')
    print('constructed transformations (lazy). Trigger action: show() -> executes job')
    sdf2.show(5)
    print('count:', sdf2.count())
else:
    pdf2 = pdf[pdf['is_paid'] == True].copy()
    pdf2['vat'] = pdf2['amount'] * 0.10
    print('pandas eager result shape:', pdf2.shape)
    display(pdf2.head())
print('elapsed:', time.time() - t0)

## 4. Aggregations & GroupBy (total amount per customer) with timing
**What this cell does:** computes total amount per customer using Spark (groupBy) or pandas groupby and times both paths for profiling.
**Keywords:** groupBy, agg, collect, toPandas, compute

In [None]:
import time, math
t0 = time.time()
if spark is not None:
    from pyspark.sql import functions as F
    res_spark = sdf.groupBy('customer_id').agg(F.count('*').alias('cnt'), F.sum('amount').alias('total_amount'))
    # show top 5 by total_amount (triggering action)
    top5 = res_spark.orderBy(F.col('total_amount').desc()).limit(5)
    top5.show()
    spark_time = time.time() - t0
    print('Spark elapsed (s):', round(spark_time,3))
else:
    import pandas as pd
    t0p = time.time()
    res_pd = pdf.groupby('customer_id').agg(cnt=('order_id','count'), total_amount=('amount','sum')).reset_index()
    res_pd = res_pd.sort_values('total_amount', ascending=False).head(5)
    display(res_pd)
    pandas_time = time.time() - t0p
    print('pandas elapsed (s):', round(pandas_time,3))
print('wall elapsed:', round(time.time()-t0,3))

## 5. Join with lookup table (small broadcast join) and timing
**What this cell does:** reads lookup CSV and joins to add customer metadata. For Spark it will demonstrate broadcast if the lookup is small.
**Keywords:** broadcast, join, small lookup

In [None]:
import time
t0 = time.time()
if spark is not None:
    from pyspark.sql import functions as F
    small = spark.read.option('header', True).option('inferSchema', True).csv(r'{lookup_csv}')
    # force broadcast if small
    from pyspark.sql.functions import broadcast
    joined = sdf.join(broadcast(small), on='customer_id', how='left')
    print('joined schema:'); joined.printSchema()
    joined.show(5)
    print('joined count:', joined.count())
    print('spark join elapsed:', time.time()-t0)
else:
    import pandas as pd
    small = pd.read_csv(r'{lookup_csv}')
    joined = pdf.merge(small, on='customer_id', how='left')
    display(joined.head())
    print('pandas join elapsed:', time.time()-t0)

## 6. Writing Results (Parquet) — with explanation
**What this cell does:** writes a sample result to Parquet (if Spark available) or pandas to_parquet. Parquet is columnar and preserves schema.
**Notes:** Spark write will create a folder with part files. pandas requires pyarrow or fastparquet to write parquet.

In [None]:
out_dir = r'{samples_dir}/out'
os.makedirs(out_dir, exist_ok=True)
if spark is not None:
    # write partitioned on country if available, otherwise simple write
    try:
        sdf.limit(1000).write.mode('overwrite').parquet(out_dir + '/sample_parquet')
        print('Wrote Parquet to', out_dir + '/sample_parquet')
    except Exception as e:
        print('Could not write parquet with Spark:', e)
else:
    try:
        pdf.head(1000).to_parquet(out_dir + '/sample_parquet_pandas.parquet')
        print('Wrote pandas parquet to', out_dir + '/sample_parquet_pandas.parquet')
    except Exception as e:
        print('Could not write parquet via pandas:', e)

## 7. Exercises — Beginner → Advanced (attempt these)
Below are exercises you can run in this notebook. Solutions follow in the next cell.

### Beginner
1. Count number of paid orders.
2. Find top 10 customers by total amount.

### Intermediate
3. For each day, compute number of orders and total amount.
4. Compute rolling 7-day sum of amount for a specific customer (use pandas or Spark window functions).

### Advanced
5. Implement a UDF (pandas_udf if Spark available) to categorize orders into 'small','medium','large' by amount quantiles.
6. Build a simple ML pipeline using Spark MLlib (e.g., predict `is_paid` from `amount` and `items`) — conceptual if Spark not available.


## 8. Exercise Solutions (runnable)
**What this cell does:** provides solutions for the exercises above. If Spark is available, uses Spark APIs; otherwise uses pandas equivalents.

In [None]:
# Solutions for exercises (will run for whichever engine is available)
import time
if spark is not None:
    from pyspark.sql import functions as F
    # 1. Count paid orders
    c1 = sdf.filter(F.col('is_paid')==True).count()
    print('1) paid orders count (spark):', c1)
    # 2. top 10 customers by total amount
    t2 = sdf.groupBy('customer_id').agg(F.sum('amount').alias('total')).orderBy(F.col('total').desc()).limit(10)
    print('2) top 10 (spark):'); t2.show()
    # 3. daily orders and totals (use date_trunc)
    daily = sdf.withColumn('day', F.to_date('event_ts')).groupBy('day').agg(F.count('*').alias('orders'), F.sum('amount').alias('total'))
    print('3) daily (spark):'); daily.show(5)
    # 4. rolling 7-day per customer (window) -- demonstration for a sample customer
    from pyspark.sql.window import Window
    w = Window.partitionBy('customer_id').orderBy(F.col('event_ts').cast('long')).rangeBetween(-7*24*3600, 0)
    roll = sdf.withColumn('rolling_7d', F.sum('amount').over(w)).filter(F.col('customer_id')== 'cust_1').select('event_ts','amount','rolling_7d').limit(5)
    print('4) rolling window sample (spark):'); roll.show()
    # 5. UDF categorization using pandas_udf (if available)
    try:
        from pyspark.sql.functions import pandas_udf, PandasUDFType
        import pandas as pd
        @pandas_udf('string')
        def quantile_cat(s: pd.Series) -> pd.Series:
            q = s.quantile([0.33,0.66])
            return pd.cut(s, bins=[-float('inf'), q.loc[0.33], q.loc[0.66], float('inf')], labels=['small','medium','large']).astype(str)
        sample = sdf.select('amount').limit(1000).withColumn('cat', quantile_cat(F.col('amount')))
        print('5) pandas_udf sample:'); sample.show(5)
    except Exception as e:
        print('pandas_udf not available or failed:', e)
    # 6. MLlib conceptual demo (sketch)
    print('6) MLlib pipeline sketch: use VectorAssembler -> LogisticRegression; fit on small sample. Run on cluster when available.')
else:
    # pandas solutions
    import pandas as pd
    pdf = pd.read_csv(r'{large_csv}', parse_dates=['event_ts'])
    # 1
    c1 = pdf[pdf['is_paid']==True].shape[0]
    print('1) paid orders count (pandas):', c1)
    # 2
    t2 = pdf.groupby('customer_id')['amount'].sum().sort_values(ascending=False).head(10)
    print('2) top 10 (pandas):'); display(t2)
    # 3 daily
    daily = pdf.set_index('event_ts').resample('D').agg({'order_id':'count','amount':'sum'}).rename(columns={'order_id':'orders'})
    print('3) daily (pandas):'); display(daily.head())
    # 4 rolling 7-day for cust_1
    cust = pdf[pdf['customer_id']=='cust_1'].set_index('event_ts').sort_index()
    cust['rolling_7d'] = cust['amount'].rolling('7D').sum()
    print('4) rolling 7d (pandas) sample:'); display(cust.head())
    # 5 categorize by quantiles
    q = pdf['amount'].quantile([0.33,0.66])
    pdf['cat'] = pd.cut(pdf['amount'], bins=[-float('inf'), q.loc[0.33], q.loc[0.66], float('inf')], labels=['small','medium','large'])
    print('5) categories value counts:'); display(pdf['cat'].value_counts())
    # 6 ML: sketch using scikit-learn locally (conceptual)
    
print('Solutions executed.')

## 9. Profiling cell (compare pandas timings for key ops)
**What this cell does:** runs simple timing for read, groupby, join operations in pandas. If Spark available, also times Spark equivalents. This gives you an idea of wall-clock differences in your environment.
**Keywords:** time, benchmark, profiling

In [None]:
import time
timings = {}
# pandas timings
import pandas as pd
t0 = time.time()
pdf = pd.read_csv(r'{large_csv}', parse_dates=['event_ts'])
timings['pandas_read_s'] = time.time() - t0
t0 = time.time()
res = pdf.groupby('customer_id')['amount'].sum()
timings['pandas_groupby_s'] = time.time() - t0
t0 = time.time()
small = pd.read_csv(r'{lookup_csv}')
joined = pdf.merge(small, on='customer_id', how='left')
timings['pandas_join_s'] = time.time() - t0

# spark timings (if available)
if spark is not None:
    from pyspark.sql import functions as F
    t0 = time.time()
    sdf_local = spark.read.option('header',True).option('inferSchema',True).csv(r'{large_csv}')
    timings['spark_read_s'] = time.time() - t0
    t0 = time.time()
    g = sdf_local.groupBy('customer_id').agg(F.sum('amount'))
    g.count()  # trigger
    timings['spark_groupby_s'] = time.time() - t0
    t0 = time.time()
    small_s = spark.read.option('header',True).option('inferSchema',True).csv(r'{lookup_csv}')
    from pyspark.sql.functions import broadcast
    j = sdf_local.join(broadcast(small_s), on='customer_id', how='left')
    j.count()
    timings['spark_join_s'] = time.time() - t0

print('Timings (seconds):', timings)

## 10. Markdown Cheat-sheet (separate file included in ZIP)
I also generated a concise markdown cheat-sheet included in the ZIP as `pyspark_cheatsheet.md`. It summarizes commands, common functions, and tuning tips.