In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pymongo import MongoClient
from bson import ObjectId
from pyspark.sql import Window
from pyspark.sql.functions import lag, expr
import boto3
import pandas as pd
from datetime import datetime
import io

# Step 1: Create a Spark session
spark = SparkSession.builder \
    .appName("MongoDB to Spark") \
    .config("spark.jars", "FILE / WEB LOCATION FOR MONGO-SPARK CONNECTOR") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

# Step 2: MongoDB connection details
mongo_uri = "mongodb://localhost:27017" #Local Machine, default same for every machine. Can be changed.
database_name = "apple"
collection_name = "intraday_r"

# Connect to MongoDB and get the collection
client = MongoClient(mongo_uri)
db = client[database_name]
collection = db[collection_name]

# Fetch data from MongoDB and convert ObjectId to string
mongo_data = []
for doc in collection.find():
    if '_id' in doc and isinstance(doc['_id'], ObjectId):
        doc['_id'] = str(doc['_id'])
    mongo_data.append(doc)

# Step 3: Convert MongoDB data to Spark DataFrame
df = spark.createDataFrame(mongo_data)

# Step 4: Define a window specification to calculate the previous row values
window_spec = Window.partitionBy("symbol").orderBy("timestamp")

# Calculate the previous volume and price using the lag function
df = df.withColumn("prev_volume", lag("volume", 1).over(window_spec))
df = df.withColumn("prev_price", lag("close", 1).over(window_spec))

# Calculate percent change in volume
df = df.withColumn("percent_change_volume", 
                   expr("(volume - prev_volume) / prev_volume * 100"))

# Calculate percent change in price
df = df.withColumn("percent_change_price", 
                   expr("(close - prev_price) / prev_price * 100"))

# Step 5: Show the updated DataFrame
#df.show(truncate=False)

# Convert Spark DataFrame to Pandas
df2 = df.toPandas()

# Set up S3 client
s3_client = boto3.client(
    "s3",
    aws_access_key_id='AWS ACCESS KEY',
    aws_secret_access_key='AWS SECRET KEY',
    region_name='REGION WHERE S3 BUCKET IS HOSTED'
)

# Generate a unique filename for S3
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
s3_file_name = f"AppleIntraday_{timestamp}.csv"

# Specify your S3 bucket name
bucket_name = 'apple-intraday'

with io.StringIO() as csv_buffer:
    df2.to_csv(csv_buffer, index=False)

    response = s3_client.put_object(
        Bucket=bucket_name, Key=s3_file_name, Body=csv_buffer.getvalue()
    )

    status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

    if status == 200:
        print(f"Successful S3 put_object response. Status - {status}")
    else:
        print(f"Unsuccessful S3 put_object response. Status - {status}")

print(f"New file uploaded to S3: s3://{bucket_name}/{s3_file_name}")


Spark version: 3.5.2
Successful S3 put_object response. Status - 200
New file uploaded to S3: s3://apple-intraday/AppleIntraday_20240820_104721.csv
