## 1. Verify Spark → HDFS connectivity and to inspect the schema / a few rows.

In [31]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://spark-master:7077").appName("smoke_test").getOrCreate()

# read just the first 1000 lines to check connectivity/schema quickly
df = spark.read.option("header","true").csv("hdfs://namenode:8020/data/flights/2006.csv")
print("schema:")
df.printSchema()
print("show 10 rows:")
df.show(10, truncate=False)
print("sample count (limit 1000 staged):")
print(df.limit(1000).count())

schema:
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- Carri

In [32]:
df = spark.read.option("header","true").csv("hdfs://namenode:8020/data/flights/2006.csv")
print(df.count())

7141922


## 2. Convert the CSV to Parquet (typed, splittable, faster queries), then register it as a Hive external table.

In [33]:
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("etl_flights") \
    .getOrCreate()

src = "hdfs://namenode:8020/data/flights/2006.csv"
out_path = "hdfs://namenode:8020/data/parquet/flights_2006"

# read with inferred schema
df = spark.read.option("header","true").option("inferSchema","true").csv(src)

# light cleaning example: drop rows missing Year/Month if those columns exist
if 'Year' in df.columns and 'Month' in df.columns:
    df_clean = df.dropna(subset=['Year','Month'])
    df_clean.repartition(8).write.mode("overwrite").partitionBy("Year","Month").parquet(out_path)
else:
    df_clean = df
    df_clean.repartition(8).write.mode("overwrite").parquet(out_path)

print("Wrote parquet to:", out_path)

Wrote parquet to: hdfs://namenode:8020/data/parquet/flights_2006


In [34]:
spark = SparkSession.builder.master("spark://spark-master:7077").appName("count_parquet").getOrCreate()
dfp = spark.read.parquet("hdfs://namenode:8020/data/parquet/flights_2006")
print('parquet rows:', dfp.count())

parquet rows: 7141922


In [35]:
# example partitioning by Year and Month if present
if 'Year' in df.columns and 'Month' in df.columns:
    df_clean = df.dropna(subset=['Year','Month'])
    df_clean.repartition(8).write.mode("overwrite").partitionBy("Year","Month").parquet(out_path)
else:
    df.repartition(8).write.mode("overwrite").parquet(out_path)

## 3.  Partition the Parquet output based on Year/Month columns

In [36]:
# stop any existing session cleanly
try:
    spark
    try:
        spark.stop()
    except Exception:
        pass
except NameError:
    pass

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("register_with_hive") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:8020/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

print("catalogImplementation:", spark.conf.get("spark.sql.catalogImplementation"))

catalogImplementation: hive


In [37]:
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("etl_flights_write_parquet") \
    .getOrCreate()

src = "hdfs://namenode:8020/data/flights/2006.csv"
out_path = "hdfs://namenode:8020/data/parquet/flights_2006"

print("Reading CSV from", src)
df = spark.read.option("header","true").option("inferSchema","true").csv(src)
print("CSV rows (sample):", df.limit(5).count())

# choose partitioning if Year/Month exist
if 'Year' in df.columns and 'Month' in df.columns:
    print("Partitioning by Year,Month")
    df_clean = df.dropna(subset=['Year','Month'])
    df_clean.repartition(8).write.mode("overwrite").partitionBy("Year","Month").parquet(out_path)
else:
    print("No Year/Month columns found – writing without partitioning")
    df.repartition(8).write.mode("overwrite").parquet(out_path)

print("Wrote parquet to:", out_path)

Reading CSV from hdfs://namenode:8020/data/flights/2006.csv
CSV rows (sample): 5
Partitioning by Year,Month
Wrote parquet to: hdfs://namenode:8020/data/parquet/flights_2006


### 4. Register Hive external table via Spark (enable Hive support):

In [38]:
import traceback

# stop prior session if any (avoids "stopped SparkContext" issues)
try:
    spark.stop()
except Exception:
    pass

# Create SparkSession with Hive support and explicit metastore URI
spark = SparkSession.builder \
    .appName("register_flights_parquet") \
    .master("spark://spark-master:7077") \
    .enableHiveSupport() \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:8020/user/hive/warehouse") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083") \
    .getOrCreate()

print("Spark session created. Catalog implementation:", spark.conf.get("spark.sql.catalogImplementation"))

# quick metastore connectivity checks
try:
    print("Databases:")
    spark.sql("SHOW DATABASES").show()
    print("Tables in default:")
    spark.sql("SHOW TABLES IN default").show()
except Exception:
    print("Error while listing catalog objects:")
    traceback.print_exc()

parquet_path = "hdfs://namenode:8020/data/parquet/flights_2006"
table_name = "flights_2006_staged"

# Sanity read direct from Parquet
print("Reading parquet directly to validate data is readable:")
try:
    df = spark.read.parquet(parquet_path)
    df.show(5)
    print("Row count (spark read):", df.count())
except Exception:
    print("Failed to read parquet directly:")
    traceback.print_exc()

# Try to create an EXTERNAL table that points to the parquet directory
print("Creating EXTERNAL table in metastore (will DROP if exists)...")
try:
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    spark.sql(f"CREATE EXTERNAL TABLE {table_name} USING PARQUET LOCATION '{parquet_path}'")
    print("Table created. Verifying count from metastore-backed table:")
    spark.sql(f"SELECT COUNT(*) AS cnt FROM {table_name}").show()
except Exception as e:
    print("CREATE TABLE / verification failed. Full traceback below:")
    traceback.print_exc()
    # attempt to print Java exception detail if available
    try:
        print("Java exception:", e.java_exception.toString())
    except Exception:
        pass
    
    

Spark session created. Catalog implementation: hive
Databases:
+------------+
|databaseName|
+------------+
|     default|
+------------+

Tables in default:
+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
| default|flights_2006_staged|      false|
+--------+-------------------+-----------+

Reading parquet directly to validate data is readable:
+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+-----+
|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDel

Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o1546.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: CREATE EXTERNAL TABLE ... USING(line 1, pos 0)

== SQL ==
CREATE EXTERNAL TABLE flights_2006_staged USING PARQUET LOCATION 'hdfs://namenode:8020/data/parquet/flights_2006'
^^^

	at org.apache.spark.sql.catalyst.parser.ParserUtils$.operationNotAllowed(ParserUtils.scala:41)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateTable$1.apply(SparkSqlParser.scala:404)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateTable$1.apply(SparkSqlParser.scala:401)
	at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserU

In [39]:
spark.sparkContext.cancelAllJobs()
spark.stop()