In [1]:
import json

with open('config.json') as config_file:
    config = json.load(config_file)

mongo_uri = config['MONGO_URI']

!pip install pymongo
from pymongo import MongoClient
import pandas as pd
import hashlib



In [2]:
atlas_client = MongoClient(mongo_uri)
db = atlas_client['stock_db']
collection = db['stocks']
metadata_collection = db['metadata']

In [3]:
def calculate_file_hash(file_path):
    hasher = hashlib.md5()
    with open(file_path, 'rb') as file:
        buf = file.read()
        hasher.update(buf)
    return hasher.hexdigest()


In [4]:
def file_already_loaded(file_hash):
    return metadata_collection.find_one({"file_hash": file_hash}) is not None

def save_file_metadata(file_path, file_hash):
    metadata = {
        "file_name": file_path,
        "file_hash": file_hash,
        "loaded_at": pd.Timestamp.now()
    }
    metadata_collection.insert_one(metadata)


In [5]:
def load_csv_to_mongodb(file_path):
    file_hash = calculate_file_hash(file_path)
    if file_already_loaded(file_hash):
        print(f"File {file_path} has already been loaded previously.")
        return
    
    df = pd.read_csv(file_path)
    collection.insert_many(df.to_dict('records'))
    save_file_metadata(file_path, file_hash)
    print(f"Data from the file {file_path} has been loaded successfully.")


In [6]:
## START The Process

In [7]:
csv_path = './all_stocks_5yr.csv'
load_csv_to_mongodb(csv_path)

File ./all_stocks_5yr.csv has already been loaded previously.


In [8]:
def get_data_from_mongodb():
    data = list(collection.find())
    df = pd.DataFrame(data)
    return df

df_mongo = get_data_from_mongodb()
df_mongo.head()


Unnamed: 0,_id,date,open,high,low,close,volume,Name
0,66972da16ad46174a727aa84,2013-02-08,15.07,15.12,14.63,14.75,8407500,AAL
1,66972da16ad46174a727aa85,2013-02-11,14.89,15.01,14.26,14.46,8882000,AAL
2,66972da16ad46174a727aa86,2013-02-12,14.45,14.51,14.1,14.27,8126000,AAL
3,66972da16ad46174a727aa87,2013-02-13,14.3,14.94,14.25,14.66,10259500,AAL
4,66972da16ad46174a727aa88,2013-02-14,14.94,14.96,13.16,13.99,31879900,AAL


In [15]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Create a Spark session with the desired configurations
spark = SparkSession.builder \
    .appName("StockDataCleaning") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.task.maxDirectResultSize", "4g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .getOrCreate()

schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("volume", StringType(), True),
    StructField("Name", StringType(), True)
])

def load_data_into_spark(df, schema):
    spark_df = spark.createDataFrame(df, schema=schema)
    return spark_df

# Load data into Spark
spark_df = load_data_into_spark(df_mongo, schema)

# Optionally, repartition the DataFrame to reduce task size
spark_df = spark_df.repartition(100)  # Adjust the number of partitions as needed

# Show the DataFrame
spark_df.printSchema()
spark_df.show(5)





24/07/16 23:22:44 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServ

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalArgumentException: '4294967296' in spark.task.maxDirectResultSize is invalid. The max direct result size is 2GB
	at org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$checkValue$1(ConfigBuilder.scala:108)
	at org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$transform$1(ConfigBuilder.scala:101)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.internal.config.ConfigEntryWithDefault.readFrom(ConfigEntry.scala:141)
	at org.apache.spark.SparkConf.get(SparkConf.scala:261)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:228)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:235)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:604)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
