In [None]:
##Data Collection

In [None]:
##Create S3 bucket and copy your data(if not created already)

In [1]:
!aws s3 mb s3://retail-demand-forecasting-datalake

make_bucket: retail-demand-forecasting-datalake


In [2]:
!aws s3 ls

2026-02-17 15:07:06 retail-demand-forecasting-datalake
2026-02-17 14:42:24 sagemaker-studio-u0emxtvemp
2026-02-17 14:42:27 sagemaker-us-east-1-435058202789


In [3]:
!aws s3 cp ../Data/train.csv \s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/train/train.csv

!aws s3 cp ../Data/stores.csv \s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/stores/stores.csv

!aws s3 cp ../Data/oil.csv \s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/oil/oil.csv

!aws s3 cp ../Data/holidays_events.csv \s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/holidays/holidays_events.csv

!aws s3 cp ../Data/transactions.csv \s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/transactions/transactions.csv


upload: ../Data/train.csv to s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/train/train.csv
upload: ../Data/stores.csv to s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/stores/stores.csv
upload: ../Data/oil.csv to s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/oil/oil.csv
upload: ../Data/holidays_events.csv to s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/holidays/holidays_events.csv
upload: ../Data/transactions.csv to s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/transactions/transactions.csv


In [4]:
!aws s3 ls s3://retail-demand-forecasting-datalake/raw/corporacion_favorita/ --recursive

2026-02-17 15:07:40      22309 raw/corporacion_favorita/holidays/holidays_events.csv
2026-02-17 15:07:38      20580 raw/corporacion_favorita/oil/oil.csv
2026-02-17 15:07:37       1387 raw/corporacion_favorita/stores/stores.csv
2026-02-17 15:07:35  121800373 raw/corporacion_favorita/train/train.csv
2026-02-17 15:07:41    1552637 raw/corporacion_favorita/transactions/transactions.csv


In [None]:
##Athena Setup

In [11]:
import boto3
import sagemaker
import pandas as pd
from pyathena import connect
session = boto3.session.Session()
region = session.region_name
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
s3_staging_dir = f"s3://{bucket}/athena/staging/"

s3 = boto3.Session().client(service_name="s3", region_name=region)

In [12]:
conn = connect(
    region_name=region,
    s3_staging_dir=s3_staging_dir
)
cursor = conn.cursor()

In [13]:
bucket = "retail-demand-forecasting-datalake"
database_name = "retail_forecasting"

s3_train_path = f"s3://{bucket}/raw/corporacion_favorita/train/"
s3_stores_path = f"s3://{bucket}/raw/corporacion_favorita/stores/"
s3_oil_path = f"s3://{bucket}/raw/corporacion_favorita/oil/"
s3_holidays_path = f"s3://{bucket}/raw/corporacion_favorita/holidays/"
s3_transactions_path = f"s3://{bucket}/raw/corporacion_favorita/transactions/"


In [14]:
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)
print(statement)

CREATE DATABASE IF NOT EXISTS retail_forecasting


In [15]:
pd.read_sql(statement, conn)

  pd.read_sql(statement, conn)


In [16]:
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

  df_show = pd.read_sql(statement, conn)


Unnamed: 0,database_name
0,default
1,retail_forecasting


In [17]:
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

In [18]:
%store ingest_create_athena_db_passed

Stored 'ingest_create_athena_db_passed' (bool)


In [19]:
##Set the parameters

#DB name
database_name = "retail_forecasting"

#Tables name
train_table = "train_sales"
stores_table = "stores"
oil_table = "oil"
holidays_table = "holidays_events"
transactions_table = "transactions"

#stagging area to store the output from Athena
s3_staging_dir = "s3://retail-demand-forecasting-datalake/athena/staging"

In [20]:
##train_sales table
statement_train = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.train_sales (
    id INT,
    date STRING,
    store_nbr INT,
    family STRING,
    sales DOUBLE,
    onpromotion INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '\"'
)
LOCATION '{s3_train_path}'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)
""".format(
    database_name,
    "train_sales",
    s3_train_path
)
print(statement_train)



CREATE EXTERNAL TABLE IF NOT EXISTS retail_forecasting.train_sales (
    id INT,
    date STRING,
    store_nbr INT,
    family STRING,
    sales DOUBLE,
    onpromotion INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '"'
)
LOCATION 's3://retail-demand-forecasting-datalake/raw/corporacion_favorita/train/'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)



In [21]:
##stores table
statement_stores = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.stores (
    store_nbr INT,
    city STRING,
    state STRING,
    type STRING,
    cluster INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '\"'
)
LOCATION '{s3_stores_path}'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)
""".format(
    database_name,
    "stores",
    s3_stores_path
)
print(statement_stores)



CREATE EXTERNAL TABLE IF NOT EXISTS retail_forecasting.stores (
    store_nbr INT,
    city STRING,
    state STRING,
    type STRING,
    cluster INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '"'
)
LOCATION 's3://retail-demand-forecasting-datalake/raw/corporacion_favorita/stores/'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)



In [22]:
##oil table
statement_oil = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.oil (
    date STRING,
    dcoilwtico DOUBLE
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '\"'
)
LOCATION '{s3_oil_path}'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)
""".format(
    database_name,
    "oil",
    s3_oil_path
)
print(statement_oil)



CREATE EXTERNAL TABLE IF NOT EXISTS retail_forecasting.oil (
    date STRING,
    dcoilwtico DOUBLE
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '"'
)
LOCATION 's3://retail-demand-forecasting-datalake/raw/corporacion_favorita/oil/'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)



In [23]:
##holidays_events table
statement_holidays = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.holidays_events (
    date STRING,
    type STRING,
    locale STRING,
    locale_name STRING,
    description STRING,
    transferred BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '\"'
)
LOCATION '{s3_holidays_path}'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)
""".format(
    database_name,
    "holidays_events",
    s3_holidays_path
)
print(statement_holidays)



CREATE EXTERNAL TABLE IF NOT EXISTS retail_forecasting.holidays_events (
    date STRING,
    type STRING,
    locale STRING,
    locale_name STRING,
    description STRING,
    transferred BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar'     = '"'
)
LOCATION 's3://retail-demand-forecasting-datalake/raw/corporacion_favorita/holidays/'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'use.null.for.invalid.data'='true'
)



In [24]:
drop_query = f"""
DROP TABLE IF EXISTS {database_name}.transactions
"""

pd.read_sql(drop_query, conn)

  pd.read_sql(drop_query, conn)


In [30]:
create_query = f"""
CREATE EXTERNAL TABLE {database_name}.transactions (
    date STRING,
    store_nbr INT,
    transactions INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '{s3_transactions_path}'
TBLPROPERTIES (
    'skip.header.line.count'='1'
)
"""

pd.read_sql(create_query, conn)


  pd.read_sql(create_query, conn)


In [26]:
pd.read_sql(f"DROP TABLE IF EXISTS {database_name}.transactions", conn)

  pd.read_sql(f"DROP TABLE IF EXISTS {database_name}.transactions", conn)


In [27]:
#Show db created(retail_forecasting)

cursor.execute("SHOW DATABASES")
cursor.fetchall()

[('default',), ('retail_forecasting',)]

In [36]:
#Show tables in db "retail_forecasting"
statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

  df_show = pd.read_sql(statement, conn)


Unnamed: 0,tab_name
0,holidays_events
1,oil
2,stores
3,train_sales
4,transactions


In [32]:
pd.read_sql(
    "SELECT COUNT(*) AS cnt FROM retail_forecasting.train_sales",
    conn
)

  pd.read_sql(


Unnamed: 0,cnt
0,3000888


In [33]:
statement = """
SELECT date, store_nbr, sales
FROM retail_forecasting.train_sales
ORDER BY date
LIMIT 10
"""

pd.read_sql(statement, conn)

  pd.read_sql(statement, conn)


Unnamed: 0,date,store_nbr,sales
0,2013-01-01,1,0.0
1,2013-01-01,1,0.0
2,2013-01-01,1,0.0
3,2013-01-01,1,0.0
4,2013-01-01,1,0.0
5,2013-01-01,1,0.0
6,2013-01-01,1,0.0
7,2013-01-01,1,0.0
8,2013-01-01,1,0.0
9,2013-01-01,1,0.0


In [35]:
cursor.execute(statement_train)

<pyathena.cursor.Cursor at 0x7f1e3f053170>

In [37]:
cursor.execute(f"DESCRIBE {database_name}.train_sales")
cursor.fetchall()


[('id                  \tstring              \tfrom deserializer   ',),
 ('date                \tstring              \tfrom deserializer   ',),
 ('store_nbr           \tstring              \tfrom deserializer   ',),
 ('family              \tstring              \tfrom deserializer   ',),
 ('sales               \tstring              \tfrom deserializer   ',),
 ('onpromotion         \tstring              \tfrom deserializer   ',)]

In [38]:
pd.read_sql(
    f"SELECT id, date, store_nbr, sales FROM {database_name}.train_sales LIMIT 5",
    conn
)

  pd.read_sql(


Unnamed: 0,id,date,store_nbr,sales
0,0,2013-01-01,1,0.0
1,1,2013-01-01,1,0.0
2,2,2013-01-01,1,0.0
3,3,2013-01-01,1,0.0
4,4,2013-01-01,1,0.0
