In [None]:
!pip install kafka-python

In [1]:
from kafka import KafkaProducer

try:
    # Attempt to connect to Kafka broker
    producer = KafkaProducer(bootstrap_servers=['172.31.99.61:9092'])

    print("Connection successful: Connected to Kafka on localhost:9092")
    producer.close()
except Exception as e:
    print("Error: Unable to connect to Kafka")
    print(e)


Connection successful: Connected to Kafka on localhost:9092


In [2]:
from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers=['172.31.99.61:9092'],
    client_id='kafka_admin_client'
)

def create_new_topic():
    """Checks if the topic 'financial_dataset' exists or not. If not, creates the topic."""
    try:
        # Get the list of topics
        topic_list = admin_client.list_topics()
        print(f"Current topics: {topic_list}")

        # Create the topic if it doesn't exist
        if 'financial_dataset' not in topic_list:
            print("Creating topic 'financial_dataset'...")
            admin_client.create_topics(new_topics=[NewTopic('financial_dataset', 1, 1)], validate_only=False)
            print("Topic 'financial_dataset' successfully created.")
        else:
            print("Topic 'financial_dataset' already exists.")
    except Exception as e:
        print("Error occurred while creating topic:", e)

create_new_topic()


Current topics: ['test', 'financial_dataset', '__consumer_offsets']
Topic 'financial_dataset' already exists.


In [3]:
import pandas as pd
from kafka import KafkaProducer
import json

# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=['172.31.99.61:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_csv_to_kafka(csv_file, topic_name):
    try:
        # Read the CSV file
        df = pd.read_csv(csv_file)
        print(f"Read {len(df)} records from {csv_file}")

        # Send each row as a JSON message
        for index, row in df.iterrows():
            producer.send(topic_name, value=row.to_dict())
            print(f"Sent: {row.to_dict()}")

        print("All records sent successfully.")
    except Exception as e:
        print("Error while sending data to Kafka:", e)

# Call the function with your CSV file path
csv_file_path = "decrypted_data_union.csv"
send_csv_to_kafka(csv_file_path, "financial_dataset")


Read 4495 records from decrypted_data_union.csv
Sent: {'Date': nan, 'GOLD_ETF': 'GOLD_ETF_GLD', 'SP': 'SP_^GSPC', 'DJ': 'DJ_^DJI', 'EG': 'EG_EGO', 'EU': 'EU_EURUSD=X', 'OF': 'OF_BZ=F', 'OS': 'OS_CL=F', 'SF': 'SF_SI=F', 'USB': 'USB_^TNX', 'PLT': 'PLT_PL=F', 'PLD': 'PLD_PA=F', 'GDX': 'GDX_GDX', 'USO': 'USO_USO', 'USDI': 'USDI_DX-Y.NYB'}
Sent: {'Date': '2007-07-30 00:00:00+00:00', 'GOLD_ETF': '65.7699966430664', 'SP': '1473.9100341796875', 'DJ': '13358.3095703125', 'EG': '21.062030792236328', 'EU': '1.3711966276168823', 'OF': '75.73999786376953', 'OS': '76.83000183105469', 'SF': '12.833999633789062', 'USB': '4.803999900817871', 'PLT': '1284.800048828125', 'PLD': nan, 'GDX': '34.7804069519043', 'USO': '460.0', 'USDI': '80.8499984741211'}
Sent: {'Date': '2007-07-31 00:00:00+00:00', 'GOLD_ETF': '65.79000091552734', 'SP': '1455.27001953125', 'DJ': '13211.990234375', 'EG': '21.875791549682617', 'EU': '1.3672035932540894', 'OF': '77.05000305175781', 'OS': '78.20999908447266', 'SF': '12.94999980

In [4]:
import sys
import warnings
import traceback
import logging

logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')

warnings.filterwarnings('ignore')
# checkpointDir = "file:///tmp/streaming/financial_data_input"
checkpointDir = r"C:\Users\avant\Downloads\checkpoint"

financial_data_index = {
    "settings": {
        "index": {
            "analysis": {
                "analyzer": {
                    "custom_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": [
                            "lowercase", "custom_edge_ngram", "asciifolding"
                        ]
                    }
                },
                "filter": {
                    "custom_edge_ngram": {
                        "type": "edge_ngram",
                        "min_gram": 2,
                        "max_gram": 10
                    }
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "Date": {
                "type": "date",
                "format": "yyyy-MM-dd",
                "ignore_malformed": "true"
            },
            "GOLD_ETF": {"type": "float"},
            "SP": {"type": "float"},
            "DJ": {"type": "float"},
            "EG": {"type": "float"},
            "EU": {"type": "float"},
            "OF": {"type": "float"},
            "OS": {"type": "float"},
            "SF": {"type": "float"},
            "USB": {"type": "float"},
            "PLT": {"type": "float"},
            "PLD": {"type": "float"},
            "GDX": {"type": "float"},
            "USO": {"type": "float"},
            "US": {"type": "float"}
        }
    }
}

In [None]:
!pip install pyspark

In [5]:
import sys
import traceback
import logging
from pyspark.sql import SparkSession

def create_spark_session():
    """
    Creates the Spark Session with suitable configs, disables security manager if needed.
    """
    try:
        # Spark session is established with necessary configurations
        spark = (SparkSession.builder
                 .appName("MinimalSparkSession")
                 .config("spark.security.manager.enabled", "false")  # Disable security manager
                 .config("spark.local.dir", "C:/spark-temp")  # Set a temporary directory for Spark files
                 .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow")  # Allow security manager
                 .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0")  # Your necessary packages
                 .config("spark.driver.memory", "2048m")
                 .config("spark.sql.shuffle.partitions", 4)
                 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                 .getOrCreate())
        
        logging.info('Spark session created successfully')
        return spark
        
    except Exception:
        traceback.print_exc(file=sys.stderr)  # Print error traceback
        logging.error("Couldn't create the spark session")
        return None


if __name__ == "__main__":
    # Create Spark session
    spark = create_spark_session()
    
    if spark:
        print("Spark Session successfully created.")
    else:
        print("Failed to create Spark session.")


2024-11-17 03:16:13,629:create_spark_session:INFO:Spark session created successfully


Spark Session successfully created.


In [6]:
def create_initial_dataframe(spark_session):
    """
    Reads the streaming data and creates the initial dataframe accordingly.
    """
    try:
        # Gets the streaming data from topic office_input.
        df = spark_session \
          .readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "172.31.99.61:9092") \
          .option("subscribe", "financial_data") \
          .load()
        logging.info("Initial dataframe created successfully")
    except Exception as e:
        logging.warning(f"Initial dataframe couldn't be created due to exception: {e}")

    return df

data=create_initial_dataframe(spark)

2024-11-17 03:16:47,786:create_initial_dataframe:INFO:Initial dataframe created successfully


In [7]:
import sys
import warnings
import traceback
import logging
from pyspark.sql.types import FloatType, StringType, DateType
from pyspark.sql import functions as F

logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')

warnings.filterwarnings('ignore')

def create_final_dataframe(df, spark_session):
    """
    Modifies the initial dataframe, and creates the final dataframe for financial data.
    """
    try:
        # Step 1: Extract the financial data from the raw message (assuming it's in CSV format)
        df2 = df.selectExpr("CAST(value AS STRING)")  # Get only the value part of the message

        # Step 2: Split the value column into individual financial fields based on CSV format
        df3 = df2.withColumn("Date", F.split(F.col("value"), ",")[0].cast(DateType())) \
            .withColumn("GOLD_ETF", F.split(F.col("value"), ",")[1].cast(FloatType())) \
            .withColumn("SP", F.split(F.col("value"), ",")[2].cast(FloatType())) \
            .withColumn("DJ", F.split(F.col("value"), ",")[3].cast(FloatType())) \
            .withColumn("EG", F.split(F.col("value"), ",")[4].cast(FloatType())) \
            .withColumn("EU", F.split(F.col("value"), ",")[5].cast(FloatType())) \
            .withColumn("OF", F.split(F.col("value"), ",")[6].cast(FloatType())) \
            .withColumn("OS", F.split(F.col("value"), ",")[7].cast(FloatType())) \
            .withColumn("SF", F.split(F.col("value"), ",")[8].cast(FloatType())) \
            .withColumn("USB", F.split(F.col("value"), ",")[9].cast(FloatType())) \
            .withColumn("PLT", F.split(F.col("value"), ",")[10].cast(FloatType())) \
            .withColumn("PLD", F.split(F.col("value"), ",")[11].cast(FloatType())) \
            .withColumn("GDX", F.split(F.col("value"), ",")[12].cast(FloatType())) \
            .withColumn("USO", F.split(F.col("value"), ",")[13].cast(FloatType())) \
            .withColumn("US", F.split(F.col("value"), ",")[14].cast(FloatType())) \
            .drop(F.col("value"))  # Drop the original 'value' column after splitting

        df3.createOrReplaceTempView("df3")

        # Step 3: Add a custom column for movement type (if needed)
        # Assuming you want to classify based on a threshold, for example, GOLD_ETF being above a certain value
        df4 = spark_session.sql("""
        SELECT
            Date,
            GOLD_ETF,
            SP,
            DJ,
            EG,
            EU,
            OF,
            OS,
            SF,
            USB,
            PLT,
            PLD,
            GDX,
            USO,
            US,
            CASE
                WHEN GOLD_ETF > 1800 THEN 'bullish'
                ELSE 'bearish'
            END AS market_movement
        FROM df3
        """)

        logging.info("Final dataframe created successfully")
        return df4

    except Exception as e:
        logging.error(f"Error in creating final dataframe: {str(e)}")
        traceback.print_exc()
        sys.exit(1)


In [10]:
df = create_final_dataframe(data, spark)

2024-11-17 03:17:51,734:create_final_dataframe:INFO:Final dataframe created successfully


In [21]:
import logging
from elasticsearch import Elasticsearch

# Set up logging configuration
logging.basicConfig(level=logging.INFO,   # Set the level of logging
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

# Create a logger object
logger = logging.getLogger()

def create_elasticsearch_connection():
    """
    Creates the ES connection.
    """
    es = None
    try:
        es = Elasticsearch("http://172.31.99.61:9200")  # Replace with your server's IP
        logger.info(f"Connection {es} created successfully")
    except Exception as e:
        logger.error(f"Error connecting to Elasticsearch: {str(e)}")
        return None
    return es

def check_if_index_exists(es):
    """
    Checks if index financial_data exists. If not, creates it and prints message accordingly.
    """
    if es.indices.exists(index="financial_data"):
        print("Index financial_data already exists")
        logger.info("Index financial_data already exists")
    else:
        es.indices.create(index="financial_data", body=financial_data_index)  # Ensure the content type is compatible
        print("Index financial_data created")
        logger.info("Index financial_data created")

# Create connection and check index
es = create_elasticsearch_connection()
if es is not None:
    check_if_index_exists(es)


2024-11-17 13:41:57,879:create_elasticsearch_connection:INFO:Connection <Elasticsearch([{'host': '172.31.99.61', 'port': 9200}])> created successfully
2024-11-17 13:41:58,103:log_request_success:INFO:PUT http://172.31.99.61:9200/financial_data [status:200 request:0.214s]
2024-11-17 13:41:58,104:check_if_index_exists:INFO:Index financial_data created


Index financial_data created


In [22]:
import pandas as pd

# Basic CSV loading
df = pd.read_csv('decrypted_data_union.csv')
display(df)

Unnamed: 0,Date,GOLD_ETF,SP,DJ,EG,EU,OF,OS,SF,USB,PLT,PLD,GDX,USO,USDI
0,,GOLD_ETF_GLD,SP_^GSPC,DJ_^DJI,EG_EGO,EU_EURUSD=X,OF_BZ=F,OS_CL=F,SF_SI=F,USB_^TNX,PLT_PL=F,PLD_PA=F,GDX_GDX,USO_USO,USDI_DX-Y.NYB
1,2007-07-30 00:00:00+00:00,65.7699966430664,1473.9100341796875,13358.3095703125,21.062030792236328,1.3711966276168823,75.73999786376953,76.83000183105469,12.833999633789062,4.803999900817871,1284.800048828125,,34.7804069519043,460.0,80.8499984741211
2,2007-07-31 00:00:00+00:00,65.79000091552734,1455.27001953125,13211.990234375,21.875791549682617,1.3672035932540894,77.05000305175781,78.20999908447266,12.949999809265137,4.770999908447266,1300.5999755859375,,34.641624450683594,468.239990234375,80.7699966430664
3,2007-08-01 00:00:00+00:00,65.93000030517578,1465.81005859375,13362.3701171875,21.205636978149414,1.367708444595337,75.3499984741211,76.52999877929688,12.887999534606934,4.758999824523926,1288.9000244140625,,34.242652893066406,462.0799865722656,80.87000274658203
4,2007-08-02 00:00:00+00:00,65.88999938964844,1472.199951171875,13463.330078125,21.875791549682617,1.3707079887390137,75.76000213623047,76.86000061035156,12.9350004196167,4.752999782562256,1297.0,,34.52888107299805,461.6000061035156,80.70999908447266
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4490,2024-10-11 00:00:00+00:00,245.47000122070312,5815.02978515625,42863.859375,17.510000228881836,1.0933860540390015,79.04000091552734,75.55999755859375,31.520000457763672,4.072999954223633,985.0,1061.800048828125,40.04999923706055,77.48999786376953,102.88999938964844
4491,2024-10-14 00:00:00+00:00,245.07000732421875,5859.85009765625,43065.21875,17.719999313354492,1.0925379991531372,77.45999908447266,73.83000183105469,31.08300018310547,4.0980000495910645,994.9000244140625,1023.0999755859375,40.15999984741211,75.93000030517578,103.30000305175781
4492,2024-10-15 00:00:00+00:00,245.9199981689453,5815.259765625,42740.421875,17.889999389648438,1.0909169912338257,74.25,70.58000183105469,31.527000427246094,4.038000106811523,987.7000122070312,1004.2000122070312,40.68000030517578,72.76000213623047,103.26000213623047
4493,2024-10-16 00:00:00+00:00,247.14999389648438,5842.47021484375,43077.69921875,17.90999984741211,1.0884116888046265,74.22000122070312,70.38999938964844,31.760000228881836,4.015999794006348,993.7999877929688,1019.0,41.0,72.4000015258789,103.58999633789062


In [23]:
# Remove the row where the data is actually column names
df = df[df.index != 0]  # Remove row with index 1

# Reset the index after removing the row
df = df.reset_index(drop=True)

# If you need to convert the date column to datetime
df['Date'] = pd.to_datetime(df['Date']).dt.date

# Convert numeric columns to appropriate types
numeric_columns = ['GOLD_ETF', 'SP', 'DJ', 'EG', 'EU', 'OF', 'OS', 'SF', 'USB', 'PLT', 'PLD', 'GDX', 'USO', 'USDI']
for col in numeric_columns:
    df[col] = pd.to_numeric(df[col], errors='coerce')  # Convert to numeric, setting errors='coerce' will turn invalid parsing into NaN

In [20]:
def start_streaming(df, es):
    """
    Starts the streaming to index financial_data in Elasticsearch.
    """
    checkpointDir = r"C:\Users\avant\Downloads\checkpoint"
    logging.info("Streaming is being started...")
    my_query = (df.writeStream
                   .format("org.elasticsearch.spark.sql")
                   .outputMode("append")
                   .option("es.nodes", "172.31.99.61")  # You may change this to the actual IP if necessary
                   .option("es.port", "9200")
                   .option("es.resource", "financial_data/_doc")  # This specifies the index name
                   .option("checkpointLocation", checkpointDir)  # Location to store checkpoint info
                   .start())

    # Wait for the streaming query to finish
    my_query.awaitTermination()
    logging.info("Streaming finished.")

start_streaming(df,es)


2024-11-17 13:32:52,627:log_request_success:INFO:POST http://172.31.99.61:9200/_bulk [status:200 request:0.103s]
2024-11-17 13:32:52,639:dataframe_to_elasticsearch:ERROR:Error uploading DataFrame: ('405 document(s) failed to index.', [{'index': {'_index': 'financial_data', '_type': '_doc', '_id': 'K0UlOZMB9bTUaDTXZTf4', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'json_parse_exception', 'reason': "Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow\n at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper); line: 1, column: 271]"}}, 'data': {'Date': datetime.date(2007, 7, 30), 'GOLD_ETF': 65.7699966430664, 'SP': 1473.9100341796875, 'DJ': 13358.3095703125, 'EG': 21.062030792236328, 'EU': 1.3711966276168823, 'OF': 75.73999786376953, 'OS': 76.83000183105469, 'SF': 12.833999633789062, 'USB': 4.803999900817871, 'PLT': 1284.800048828125, 'PLD': na

In [None]:
def dataframe_to_elasticsearch(df, index_name):
    """
    Uploads a pandas DataFrame to Elasticsearch in smaller batches with error logging.
    
    :param df: pandas DataFrame to be uploaded.
    :param index_name: Name of the Elasticsearch index.
    """
    try:
        actions = [
            {
                "_index": index_name,
                "_source": row.to_dict()
            }
            for _, row in df.iterrows()
        ]
        
        # Use smaller batch size for bulk API
        chunk_size = 500
        if actions:
            response = helpers.bulk(es, actions, chunk_size=chunk_size, raise_on_error=False)
            logging.info(f"Uploaded {response[0]} documents successfully.")
            logging.warning(f"Failed to upload {response[1]} documents.")
        else:
            logging.warning("No data in DataFrame to upload.")
    except Exception as e:
        logging.error(f"Error during upload: {e}")

def main():
     import pandas as pd
     # Upload DataFrame to Elasticsearch
     dataframe_to_elasticsearch(df, index_name)

if __name__ == "__main__":
     main()

