In [6]:
%glue_version 4.0
%worker_type G.1X
%number_of_workers 3
%idle_timeout 60  

%%configure
{
  "--datalake-formats": "iceberg",
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
}


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 3
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--datalake-formats': 'iceberg', '--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions'}


In [1]:
import boto3
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 3
Idle Timeout: 60
Session ID: 8e4bd850-7291-4ec9-a11c-669ab663c9c6
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
--datalake-formats iceberg
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Waiting for session 8e4bd850-7291-4ec9-a11c-669ab663c9c6 to get into ready status...
Session 8e4bd850-7291-4ec9-a11c-669ab663c9c6 has been created.



In [2]:
BUCKET_NAME = "algotrading-datalake"
BUCKET_PREFIX = ""
ICEBERG_CATALOG_NAME = "glue_catalog"
ICEBERG_DATABASE_NAME = "algo_data"
ICEBERG_TABLE_NAME = "hist_ohlcv_daily_alphavantage"
WAREHOUSE_PATH = f"s3://{BUCKET_NAME}/{BUCKET_PREFIX}"
FULL_TABLE_NAME = f"{ICEBERG_CATALOG_NAME}.{ICEBERG_DATABASE_NAME}.{ICEBERG_TABLE_NAME}"




In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.sql.warehouse.dir", WAREHOUSE_PATH) \
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}.warehouse", WAREHOUSE_PATH) \
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{ICEBERG_CATALOG_NAME}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()




In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType
schema = StructType([
    StructField("dt", DateType(), True),   
    StructField("symbol", StringType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", DoubleType(), True)        
])




In [21]:
import sys
import requests
from datetime import datetime, timedelta, timezone
import pandas as pd


# --- Configuration ---
API_KEY = '27MOH0ZJ8C2TTVW6' # Replace with your actual Alpha Vantage API Key
SYMBOLS = ['INTC']
DAYS_BACK = 5

# --- Data Fetching and Processing Classes (from your original code) ---
class OHLCVFetcher:
    def __init__(self, api_key, days_back):
        self.api_key = api_key
        self.days_back = days_back

    def get_time_range_str(self):
        now = datetime.now(timezone.utc)
        time_to = now.strftime('%Y%m%dT%H%M')
        time_from = (now - timedelta(days=self.days_back)).strftime('%Y%m%dT%H%M')
        return time_from, time_to

    def fetch_ohlcv_for_symbol(self, symbol):
        time_from, time_to = self.get_time_range_str()
        url = (
            f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY'
            f'&symbol={symbol}&apikey={self.api_key}'
            f'&outputsize=full'

        )
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            return data.get('Time Series (Daily)', [])
        else:
            print(f"Failed to fetch OHLCV for {symbol}")
            return []

class OHLCVRecord:
    def __init__(self, date, values, symbol):
        self.symbol = symbol
        self.open = values.get('1. open') # Original string
        self.high = values.get('2. high') # Original string
        self.low = values.get('3. low') # Original string
        self.close = values.get('4. close') # Original string
        self.volume = values.get('5. volume') # Original string        
        # Extract date string (YYYY-MM-DD) from the datetime object
        self.date = date

    def to_dict(self):
        return {
            'dt': self.date, # YYYY-MM-DD string - Removed as requested
            'symbol': self.symbol,
            'open': float(self.open),
            'high': float(self.high), # Datetime object
            'low': float(self.low), # Datetime object
            'close': float(self.close),
            'volume': float(self.volume),            
        }


fetcher = OHLCVFetcher(API_KEY, DAYS_BACK)
ohlcv_records = []




In [6]:
for symbol in SYMBOLS:
    ohlcv = fetcher.fetch_ohlcv_for_symbol(symbol)




In [22]:
for date, values in ohlcv.items():
    ohlcv_tick = OHLCVRecord(date=date, values = values, symbol = symbol)
    ohlcv_records.append(ohlcv_tick.to_dict())




In [23]:
# 4. Create Pandas DataFrame
pandas_df = pd.DataFrame(ohlcv_records)
pandas_df['dt'] = pd.to_datetime(pandas_df['dt']).dt.date

# 5. Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df, schema=schema)

  for column, series in pdf.iteritems():


In [None]:
try:
    # Try to append first
    spark_df.writeTo(FULL_TABLE_NAME).append()
except Exception as append_err:
    try:
        (
            spark_df.writeTo(FULL_TABLE_NAME)
            .using("iceberg")
            .partitionedBy("dt")  # Partitioning
            .tableProperty("format-version", "2")  # Optional Iceberg version
            .create()
        )
    except Exception as create_err:
        print(f"Failed to create Iceberg table: {create_err}")