# Example Data Science Notebook

This notebook demonstrates basic data analysis workflows.

In [None]:
import sys
import os
from pathlib import Path


# Add src to path
# sys.path.insert(0, str(Path().resolve().parent / 'src'))

sys.path.append(str(ROOT / "src"))
os.environ["SPARK_LOCAL_DIRS"] = str((ROOT / "spark_tmp").resolve())

import pandas as pd
import numpy as np
# import matplotlib.pyplot as plt
# import seaborn as sns

from ds.analyzer import DataAnalyzer
from de.load.loader import DataLoader

print("Imports successful!")

## Load Sample Data

In [None]:
import pandas as pd
import numpy as np

In [None]:
# Create sample data
np.random.seed(42)
data = {
    'feature1': np.random.randn(100),
    'feature2': np.random.randn(100),
    'feature3': np.random.randn(100),
    'target': np.random.randint(0, 2, 100)
}
df = pd.DataFrame(data)
df.head()

## Exploratory Data Analysis

In [None]:
analyzer = DataAnalyzer(df)
analyzer.summary_stats()

In [None]:
# Check for missing values
analyzer.check_missing()

## Visualizations

In [None]:
# Distribution plot
plt.figure(figsize=(12, 4))
for i, col in enumerate(['feature1', 'feature2', 'feature3'], 1):
    plt.subplot(1, 3, i)
    plt.hist(df[col], bins=20, edgecolor='black')
    plt.title(f'Distribution of {col}')
    plt.xlabel('Value')
    plt.ylabel('Frequency')
plt.tight_layout()
plt.show()

In [None]:
# Correlation matrix
plt.figure(figsize=(8, 6))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix')
plt.show()

## Prepare Data for ML

In [None]:
X_train, X_test, y_train, y_test = analyzer.prepare_features('target')
print(f"Training set size: {len(X_train)}")
print(f"Test set size: {len(X_test)}")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    expr,
    lower,
    regexp_replace,
    to_timestamp,
)
from pathlib import Path

from src.common.config import RAW_DIR
from src.de.spark_jobs.traffy_basic_etl import main

RAW_FILE = Path("../data/raw/bangkok_traffy.csv")
OUTPUT_DIR = Path("../data/preprocessed/traffy_cleaned_parquet")

In [None]:
import os, sys
from pathlib import Path

ROOT = Path.cwd().parent
sys.path.append(str(ROOT / "src"))
os.environ["SPARK_LOCAL_DIRS"] = str((ROOT / "spark_tmp").resolve())

from src.de.spark_jobs import traffy_basic_etl as etl

etl.main()


In [None]:
from pathlib import Path
out = Path("data/processed/traffy_cleaned_parquet")
if out.exists():
    if out.is_file():
        out.unlink()
    else:
        import shutil
        shutil.rmtree(out)


In [None]:
%pip install findspark

In [1]:
import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-21"
os.environ["SPARK_HOME"] = "C:\spark"
os.environ["HADOOP_HOME"] = "C:\hadoop"

  os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-21"
  os.environ["SPARK_HOME"] = "C:\spark"
  os.environ["HADOOP_HOME"] = "C:\hadoop"


In [2]:
import findspark
findspark.init()

In [3]:
spark_url = 'local'

In [4]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark Tutorial')\
        .config('spark.ui.port', '4040')\
        .getOrCreate()

spark

In [7]:
cd ..

d:\CEDT02\DSDE\JekTurnRight_dsde


In [8]:
from src.de.spark_jobs.traffy_flood_etl import run_traffy_flood_etl
import os
from pathlib import Path

ROOT = Path.cwd()

INPUT = f"{ROOT}/data/raw/bangkok_traffy.csv"
CLEANED_OUT = f"{ROOT}/data/processed/traffy_clean.parquet"
FLOOD_TS_OUT = f"{ROOT}/data/processed/flood_daily_by_district.parquet"

run_traffy_flood_etl(
    spark,
    input_path=INPUT,
    cleaned_output_path=CLEANED_OUT,
    flood_ts_output_path=FLOOD_TS_OUT
)
spark.stop()


ðŸš€ RUNNING TRAFFY FLOOD ETL PIPELINE
[STEP] Loading raw Traffy CSV...
[STEP] Cleaning + schema validation + Bangkok filter...
[STEP] Adding time columns...
[STEP] Adding flood_flag column...
[STEP] Aggregating daily flood counts per district...
[STEP] Writing cleaned tickets...


Py4JJavaError: An error occurred while calling o260.parquet.
: java.util.concurrent.ExecutionException: Boxed Exception
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolve(Promise.scala:99)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
	at scala.concurrent.Promise.complete(Promise.scala:57)
	at scala.concurrent.Promise.complete$(Promise.scala:56)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:104)
	at scala.concurrent.Promise.failure(Promise.scala:109)
	at scala.concurrent.Promise.failure$(Promise.scala:109)
	at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:104)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$2(QueryStageExec.scala:333)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:369)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolve(Promise.scala:99)
		at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
		at scala.concurrent.Promise.complete(Promise.scala:57)
		at scala.concurrent.Promise.complete$(Promise.scala:56)
		at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:104)
		at scala.concurrent.Promise.failure(Promise.scala:109)
		at scala.concurrent.Promise.failure$(Promise.scala:109)
		at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:104)
		at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$2(QueryStageExec.scala:333)
		at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
		at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
		at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
		at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
		... 1 more
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:817)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1415)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1620)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:739)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:961)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:194)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:481)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
