In [None]:
!pip install yfinance minio py2neo neo4j
!sudo cp /home/jovyan/work/jars/neo4j-connector-apache-spark_2.12-4.1.0_for_spark_3.jar /usr/local/spark/jars/

import yfinance as yf
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import col
from minio import Minio
from minio.error import S3Error
from py2neo import Graph, Node, Relationship, Subgraph

# Minio configuration
s3_host = "minio"
s3_url = f"http://{s3_host}:9000"
s3_key = "minio"
s3_secret = "SU2orange!"
s3_bucket = "minio-project"

# Initialize Minio
minio_client = Minio(
    f"{s3_host}:9000",
    access_key=s3_key,
    secret_key=s3_secret,
    secure=False
)

# Create bucket with try block
try:
    if not minio_client.bucket_exists(s3_bucket):
        minio_client.make_bucket(s3_bucket)
        print(f"Bucket '{s3_bucket}' created.")
    else:
        print(f"Bucket '{s3_bucket}' already exists.")
except S3Error as err:
    print(f"Minio error: {err}")

# Initialize Spark session with Minio and Neo4j configuration
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("yfinance-minio-spark") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.1.2") \
    .config("spark.hadoop.fs.s3a.endpoint", s3_url) \
    .config("spark.hadoop.fs.s3a.access.key", s3_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret) \
    .config("spark.hadoop.fs.s3a.path.style.access", True) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars", "/usr/local/spark/jars/neo4j-connector-apache-spark_2.12-4.1.0_for_spark_3.jar") \
    .config("spark.neo4j.bolt.url", "bolt://neo4j:7687") \
    .config("spark.neo4j.bolt.user", "neo4j") \
    .config("spark.neo4j.bolt.password", "SU2orange!") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Define a list of stocks to analyze
tickers = ["AAPL", "AMZN", "MSFT", "GOOGL", "TSLA", "BRK-B", "JNJ", "V", "WMT",
           "PG", "UNH", "NVDA", "HD", "DIS", "PYPL", "MA", "VZ", "ADBE", "NFLX",
           "INTC", "CSCO", "KO", "PEP", "T", "PFE", "MRK", "ABT", "XOM", "CVX",
           "CRM", "MCD", "BMY", "NKE", "BA", "WBA", "IBM", "MMM", "GE", "AGG",
           "BIV", "BND", "BSV", "DIA", "EEM", "EFA", "ENZL", "EPHE", "EWA",
           "EWC", "EWG", "EWH", "EWI", "EWJ", "EWK", "EWM", "EWN", "EWQ", "EWS",
           "EWT", "EWU", "EWY", "EWZ", "EZU", "FEZ", "FLRN", "FXI", "GXC", "HYG",
           "IEF", "IJH", "IJR", "INDA", "ITOT", "IWB", "IWD", "IWF", "IWM", "IWO",
           "LQD", "MBB", "MCHI", "MUB", "OIH", "PFF", "QQQ", "REM", "RSX", "SHV",
           "SHY", "SLV", "SPY", "THD", "TLH", "TLT", "TUR", "VB", "VBK", "VBR",
           "VEA", "VGK", "VGT", "VO", "VOE", "VOO", "VPL", "VTI", "VTWO", "VUG",
           "VWO", "XLB", "XLE", "XLF", "XLI", "XLK", "XLP", "XLU", "XLV", "XLY"]

# Download stock data using yfinance
data = yf.download(tickers, start="2024-05-01")

# Flatten the column names and reset index
data.columns = ['_'.join(filter(None, col)).strip() for col in data.columns.values]
data.reset_index(inplace=True)

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(data)

# Convert Date column to string
spark_df = spark_df.withColumn("Date", col("Date").cast("string"))

# Save the Spark DataFrame to Minio as CSV
output_path = f"s3a://{s3_bucket}/stocks_data.csv"
spark_df.write.mode("overwrite").csv(output_path, header=True)

print(f"Data saved to Minio at {output_path}")

#
#
# Perform Spark operations
selected_columns = [col for col in spark_df.columns if col.startswith("Close_")]

# Convert columns to vector column first, using MLlib.  First bottleneck avoided here
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=selected_columns, outputCol=vector_col, handleInvalid="skip")
df_vector = assembler.transform(spark_df).select(vector_col)

# Compute the correlation matrix
correlation_matrix = Correlation.corr(df_vector, vector_col).head()[0]

# Convert correlation matrix to a readable format
correlation_df = spark.createDataFrame(
    [(float(correlation_matrix[i, j]), selected_columns[i], selected_columns[j])
     for i in range(len(selected_columns)) for j in range(i + 1, len(selected_columns))],
    ["correlation", "stock1", "stock2"]
)

from pyspark.sql.functions import abs as spark_abs

# Filter correlations based on a threshold
correlation_threshold = 0.5
filtered_correlations_df = correlation_df.filter(spark_abs(col("correlation")) >= correlation_threshold)

# Show the filtered correlations
filtered_correlations_df.show()

#
#
# Save the filtered correlations to Neo4j
neo4j_url = "neo4j://neo4j:7687"
neo4j_user = "neo4j"
neo4j_password = "SU2orange!"

# Initialize Neo4j connection
graph = Graph(neo4j_url, auth=(neo4j_user, neo4j_password))

# Batch operations in Neo4j made it quicker, this fixed the second bottleneck
nodes = []
relationships = []
for row in filtered_correlations_df.collect():
    stock1 = row['stock1'].replace("Close_", "")
    stock2 = row['stock2'].replace("Close_", "")
    correlation = row['correlation']

    # Create nodes and relationships in Neo4j
    ticker1_node = Node("Stock", name=stock1)
    ticker2_node = Node("Stock", name=stock2)
    correlation_rel = Relationship(ticker1_node, "CORRELATES_WITH", ticker2_node, correlation=correlation)

    nodes.append(ticker1_node)
    nodes.append(ticker2_node)
    relationships.append(correlation_rel)

# Create subgraph and merge it in one batch
subgraph = Subgraph(nodes, relationships)
graph.create(subgraph)

print("Correlation results saved to Neo4j.")

# Cypher query to read data from Neo4j
cypher_query = """
MATCH (s1:Stock)-[r:CORRELATES_WITH]->(s2:Stock)
RETURN s1.name AS stock1, s2.name AS stock2, r.correlation AS correlation
"""

# Read data from Neo4j into a Spark DataFrame
df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://neo4j:7687") \
    .option("query", cypher_query) \
    .load()

# Show the DataFrame
df.show()

# Perform some logic: Filter for strong correlations
strong_correlations_df = df.filter(col("correlation") > 0.8)

# Show the filtered DataFrame
strong_correlations_df.show()

# Save the filtered DataFrame back to Minio??
output_path = f"s3a://{s3_bucket}/strong_correlations.csv"
strong_correlations_df.write.mode("overwrite").csv(output_path, header=True)
print(f"Strong correlations saved to Minio at {output_path}")

# Writing back to Neo4j with stronger relationships
nodes = []
relationships = []
for row in strong_correlations_df.collect():
    stock1 = row['stock1']
    stock2 = row['stock2']
    correlation = row['correlation']

    # Create nodes and relationships in Neo4j
    ticker1_node = Node("Stock", name=stock1)
    ticker2_node = Node("Stock", name=stock2)
    correlation_rel = Relationship(ticker1_node, "STRONGLY_CORRELATES_WITH", ticker2_node, correlation=correlation)

    nodes.append(ticker1_node)
    nodes.append(ticker2_node)
    relationships.append(correlation_rel)

# Create subgraph and merge it in one batch
subgraph = Subgraph(nodes, relationships)
graph.create(subgraph)

print("Strong correlations saved to Neo4j.")
