## Data Ingestion

In [11]:
!pip install pyspark requests



In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
import requests
import pandas as pd
import time
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("IEXCloudIngestion").getOrCreate()

# Define tickers
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA", "INTC", "CRM", "ADBE"]

# Your IEX Cloud API Key
API_KEY = 'pk_e274e451b38f4d64b1c19dd3b1c0314c'

# Placeholder for PySpark DataFrames
dfs = []

# Base URL for intra-day data. Here, we're assuming there's a way to fetch minute data by specifying a date.
# This is hypothetical since the exact endpoint may differ.
BASE_URL = "https://cloud.iexapis.com/stable/stock/{}/chart/date/{}?token={}"
MAX_ATTEMPTS = 5
WAIT_SECONDS = 60  # You might want to adjust this based on the API's rate limit reset time

# Define schema for PySpark DataFrame
# schema = StructType([
#     StructField('date', StringType(), True),
#     StructField('minute', StringType(), True),
#     StructField('label', StringType(), True),
#     StructField('high', FloatType(), True),
#     StructField('low', FloatType(), True),
#     StructField('average', FloatType(), True),
#     StructField('volume', FloatType(), True),
#     StructField('notional', FloatType(), True),
#     StructField('numberOfTrades', FloatType(), True),
#     StructField('marketHigh', FloatType(), True),
#     StructField('marketLow', FloatType(), True),
#     StructField('marketAverage', FloatType(), True),
#     StructField('marketVolume', FloatType(), True),
#     StructField('marketNotional', FloatType(), True),
#     StructField('marketNumberOfTrades', FloatType(), True),
#     StructField('open', FloatType(), True),
#     StructField('close', FloatType(), True),
#     StructField('marketOpen', FloatType(), True),
#     StructField('marketClose', FloatType(), True),
#     StructField('changeOverTime', FloatType(), True),
#     StructField('marketChangeOverTime', FloatType(), True),
#     StructField('Ticker', StringType(), True),
# ])

schema = StructType([
    StructField('date', StringType(), True),
    StructField('minute', StringType(), True),
    StructField('label', StringType(), True),
    StructField('high', FloatType(), True),
    StructField('low', FloatType(), True),
    StructField('average', FloatType(), True),
    StructField('open', FloatType(), True),
    StructField('close', FloatType(), True),
    # StructField('changeOverTime', FloatType(), True),
    StructField('Ticker', StringType(), True),
    # Add other fields as necessary
])

for ticker in tickers:
    # First, fetch max history to get date range
    response_max = requests.get(f"https://cloud.iexapis.com/stable/stock/{ticker}/chart/max?token={API_KEY}")
    
    if response_max.status_code == 200:
        max_data = response_max.json()

        # Extract date range (this is hypothetical, you'll need to adjust based on the actual API response structure)
        start_date = datetime.strptime(max_data[0]['date'], '%Y-%m-%d')
        end_date = datetime.strptime(max_data[-1]['date'], '%Y-%m-%d')

        current_date = start_date
        while current_date <= end_date:
            attempts = 0  # Initialize the attempts counter for each date
            wait_seconds = WAIT_SECONDS

            while attempts < MAX_ATTEMPTS:
                # For each date, fetch minute-by-minute data
                response = requests.get(BASE_URL.format(ticker, current_date.strftime('%Y%m%d'), API_KEY))
                attempts += 1

                if response.status_code == 429:
                    print(f"Rate limit exceeded for {ticker} on {current_date.strftime('%Y-%m-%d')}. Retrying in {wait_seconds} seconds...")
                    time.sleep(wait_seconds)
                    wait_seconds *= 2  # Double the wait time for the next attempt
                elif response.status_code == 200:
                    data_json = response.json()
                
                    if data_json:  # Check if data is not empty
                        # Select only the relevant columns
                        relevant_columns = ['date', 'minute', 'label', 'high', 'low', 'average', 'open', 'close']
                        data_pd = pd.DataFrame(data_json)[relevant_columns]

                        # Convert data types to match schema if necessary
                        data_pd['date'] = data_pd['date'].astype(str)
                        data_pd['minute'] = data_pd['minute'].astype(str)
                        data_pd['label'] = data_pd['label'].astype(str)
                        data_pd['high'] = data_pd['high'].astype(float)
                        data_pd['low'] = data_pd['low'].astype(float)
                        data_pd['average'] = data_pd['average'].astype(float)
                        data_pd['open'] = data_pd['open'].astype(float)
                        data_pd['close'] = data_pd['close'].astype(float)
                        # data_pd['changeOverTime'] = data_pd['changeOverTime'].astype(float)
                        data_pd['Ticker'] = ticker  # Ticker is already a string, so no conversion needed

                        # for column_name in data_spark.columns:
                        #     if isinstance(data_spark.schema[column_name].dataType, (IntegerType)):
                        #         data_spark = data_spark.withColumn(column_name, col(column_name).cast(FloatType()))
                        
                        # Convert to PySpark DataFrame using the predefined schema
                        data_spark = spark.createDataFrame(data_pd, schema=schema)

                        dfs.append(data_spark)
                    break  # Exit the inner while loop since the request was successful
                else:
                    print(f"Failed to fetch minute data for {ticker} on {current_date.strftime('%Y-%m-%d')}, Status code: {response.status_code}")
                    break  # Exit the inner while loop since there was a non-rate-limit error
            
            # Check if maximum attempts were reached without a successful request
            if attempts == MAX_ATTEMPTS:
                print(f"Reached maximum number of attempts for {ticker} on {current_date.strftime('%Y-%m-%d')}")
        
            # Move to next date
            current_date += timedelta(days=1)

    else:
        print(f"Failed to fetch max history for {ticker}, Status code: {response_max.status_code}")

# Union all individual dataframes to create a single PySpark DataFrame
final_df = None
if dfs:
    final_df = dfs[0]
    for df in dfs[1:]:
        final_df = final_df.union(df)

    final_df.show()
else:
    print("No dataframes to union")

ConnectionError: HTTPSConnectionPool(host='cloud.iexapis.com', port=443): Max retries exceeded with url: /stable/stock/MSFT/chart/date/20151227?token=pk_e274e451b38f4d64b1c19dd3b1c0314c (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f89033991b0>: Failed to establish a new connection: [Errno 61] Connection refused'))

In [2]:
from pyspark.sql import SparkSession
import requests
import pandas as pd
from datetime import datetime, timedelta
import time

spark = SparkSession.builder.appName("IEXCloudIngestion").getOrCreate()

# Define tickers
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA", "INTC", "CRM", "ADBE"]

# Your IEX Cloud API Key
API_KEY = 'pk_e274e451b38f4d64b1c19dd3b1c0314c'

# Placeholder for PySpark DataFrames
dfs = []

# Base URL for intra-day data. Here, we're assuming there's a way to fetch minute data by specifying a date.
# This is hypothetical since the exact endpoint may differ.
BASE_URL = "https://cloud.iexapis.com/stable/stock/{}/chart/date/{}?token={}"
MAX_ATTEMPTS = 5
WAIT_SECONDS = 60  # You might want to adjust this based on the API's rate limit reset time

for ticker in tickers:
    # First, fetch max history to get date range
    response_max = requests.get(f"https://cloud.iexapis.com/stable/stock/{ticker}/chart/max?token={API_KEY}")
    
    if response_max.status_code == 200:
        max_data = response_max.json()

        # Extract date range (this is hypothetical, you'll need to adjust based on the actual API response structure)
        start_date = datetime.strptime(max_data[0]['date'], '%Y-%m-%d')
        end_date = datetime.strptime(max_data[-1]['date'], '%Y-%m-%d')

        current_date = start_date
        while current_date <= end_date:
            attempts = 0  # Initialize the attempts counter for each date
            wait_seconds = WAIT_SECONDS

            while attempts < MAX_ATTEMPTS:
                # For each date, fetch minute-by-minute data
                response = requests.get(BASE_URL.format(ticker, current_date.strftime('%Y%m%d'), API_KEY))
                attempts += 1

                if response.status_code == 429:
                    print(f"Rate limit exceeded for {ticker} on {current_date.strftime('%Y-%m-%d')}. Retrying in {WAIT_SECONDS} seconds...")
                    time.sleep(wait_seconds)
                    wait_seconds *= 2  # Double the wait time for the next attempt
                elif response.status_code == 200:
                    data_json = response.json()
                    
                    if data_json:  # Check if data is not empty
                        # Convert to Pandas DataFrame
                        data_pd = pd.DataFrame(data_json)
                        
                        # Add a column for the ticker
                        data_pd['Ticker'] = ticker

                        # Convert to PySpark DataFrame
                        data_spark = spark.createDataFrame(data_pd)
                        dfs.append(data_spark)
                    break  # Exit the inner while loop since the request was successful
                else:
                    print(f"Failed to fetch minute data for {ticker} on {current_date.strftime('%Y-%m-%d')}")
                    break  # Exit the inner while loop since there was a non-rate-limit error
            
            # Check if maximum attempts were reached without a successful request
            if attempts == MAX_ATTEMPTS:
                print(f"Reached maximum number of attempts for {ticker} on {current_date.strftime('%Y-%m-%d')}")
        
            # Move to next date
            current_date += timedelta(days=1)

    else:
        print(f"Failed to fetch max history for {ticker}")

# Union all individual dataframes to create a single PySpark DataFrame
final_df = dfs[0]
for df in dfs[1:]:
    final_df = final_df.union(df)

final_df.show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/24 21:02:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None

PySparkValueError: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.

In [3]:
data_pd = data_pd.dropna(how='all')

In [None]:
num_rows = final_df.count()
print(num_rows)




40158


                                                                                

In [None]:
final_df.write.csv("stock_data.csv", header=True, mode="overwrite")
