In [None]:
from pyspark.sql import SparkSession


# Initialize PySpark with MongoDB support
APP_NAME = "Collecting and Displaying Records"
spark = (
    SparkSession.builder.appName(APP_NAME)
    # Load support for MongoDB and Elasticsearch
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.elasticsearch:elasticsearch-spark-30_2.12:7.14.2")
    # Add Configuration for MongopDB
    .config("spark.mongodb.input.uri", "mongodb://mongo:27017/test.coll")
    .config("spark.mongodb.output.uri", "mongodb://mongo:27017/test.coll")

    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

print("\nPySpark initialized...")

In [None]:
on_time_df = spark.read.parquet('..data/on_time_performance.parquet')

# Use SQL to look at the total flights by month across 2015
on_time_df.createOrReplaceTempView("on_time_dataframe")
total_flights_by_month = spark.sql(
  """SELECT INT(Month), INT(Year), COUNT(*) AS total_flights
  FROM on_time_dataframe
  GROUP BY INT(Year), INT(Month)
  ORDER BY INT(Year), INT(Month)"""
)

total_flights_by_month.toPandas()

In [None]:
# Save chart to MongoDB
(
    total_flights_by_month.write.format("mongo")
    .mode("append")
    .option("database", "agile_data_science")
    .option("collection", "flights_by_month")
    .save()
)

print("Wrote to MongoDB!")

In [None]:
! mongosh mongodb://mongo:27017/agile_data_science --eval 'db.flights_by_month.find().sort({"Year": 1, "Month": 1})'

In [None]:
on_time_df.columns