<a href="https://colab.research.google.com/github/tannyyaawadhwani-design/puthony/blob/main/datalake.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q boto3 pandas matplotlib
#Importing libraries
import os, time, boto3, botocore
import pandas as pd
import matplotlib.pyplot as plt
from google.colab import files
from io import StringIO

#Upload your access-keys CSV (choose tannyyaacollab_accesskeys.csv)
print("Please upload your access-keys CSV (tannyyaacollab_accesskeys.csv). Use the file chooser that appears.")
uploaded = files.upload()  # interactively choose the CSV

#Read keys from uploaded CSV (try common column names)
key_file = None
for name in uploaded.keys():
    if name.lower().endswith(".csv"):
        key_file = name
        break
if not key_file:
    raise SystemExit("No CSV uploaded. Please upload tannyyaacollab_accesskeys.csv and re-run this cell.")

df_keys = pd.read_csv(key_file)
# Attempt to find columns - common IAM CSV columns:
possible_access_cols = ["Access key ID", "Access key id", "Access Key ID", "AccessKeyId", "access_key_id", "AccessKeyId"]
possible_secret_cols = ["Secret access key", "Secret Access Key", "SecretAccessKey", "SecretKey", "secret_access_key"]

def find_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    # try lowercase matching
    lc = {col.lower(): col for col in df.columns}
    for c in candidates:
        if c.lower() in lc:
            return lc[c.lower()]
    return None

access_col = find_col(df_keys, possible_access_cols)
secret_col = find_col(df_keys, possible_secret_cols)
if access_col is None or secret_col is None:
    print("Couldn't autodetect Access Key or Secret Key columns. Here are the CSV columns:", list(df_keys.columns))
    raise SystemExit("Rename columns or provide a CSV with 'Access key ID' and 'Secret access key' columns.")

access_key = str(df_keys.iloc[0][access_col]).strip()
secret_key = str(df_keys.iloc[0][secret_col]).strip()

#Set environment vars for this Colab session
os.environ["AWS_ACCESS_KEY_ID"] = access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = secret_key

# Default region (change if you want)
DEFAULT_REGION = "eu-north-1"   # change if needed
os.environ.setdefault("AWS_DEFAULT_REGION", DEFAULT_REGION)
os.environ.setdefault("DATA_LAKE_BUCKET", "tanya-datalake-demo-2025")

REGION = os.getenv("AWS_DEFAULT_REGION")
BUCKET = os.getenv("DATA_LAKE_BUCKET")
ATHENA_DB = os.getenv("ATHENA_DB", "demo_db")
ATHENA_TABLE = os.getenv("ATHENA_TABLE", "orders")
ATHENA_OUTPUT = f"s3://{BUCKET}/athena-results/"

print("Using AWS region:", REGION)
print("Using S3 bucket:", BUCKET)

#Creating local sample CSV
sample_csv = """order_id,customer_id,amount,order_date
1,tanya_001,100.5,2025-09-01
2,shivani_002,50.0,2025-09-01
3,rishi_003,75.25,2025-09-02
4,shubham_004,123.00,2025-09-03
5,tanushree_005,10.00,2025-09-03
6,priyanka_006,500.00,2025-09-04
7,ankita_007,9.99,2025-09-04
8,khushi_008,250.00,2025-09-05
"""
LOCAL_FILE = "sample_orders.csv"
with open(LOCAL_FILE, "w") as f:
    f.write(sample_csv)
print("Created sample CSV:", LOCAL_FILE)

#Init boto3 clients
s3 = boto3.client("s3", region_name=REGION)
athena = boto3.client("athena", region_name=REGION)

#Ensure bucket exists and upload the CSV
def ensure_bucket_and_upload(bucket, local_file, s3_key):
    try:
        s3.head_bucket(Bucket=bucket)
        print("Bucket exists:", bucket)
    except botocore.exceptions.ClientError as e:
        err = e.response.get("Error", {})
        code = err.get("Code", "")
        # create bucket if not present
        print("Bucket not found or not accessible. Attempting to create bucket:", bucket)
        s3.create_bucket(Bucket=bucket, CreateBucketConfiguration={"LocationConstraint": REGION})
        s3.put_bucket_versioning(Bucket=bucket, VersioningConfiguration={"Status": "Enabled"})
        print("Created bucket:", bucket)
    s3.upload_file(local_file, bucket, s3_key)
    print(f"Uploaded {local_file} to s3://{bucket}/{s3_key}")

S3_KEY = "raw/orders/sample_orders.csv"
ensure_bucket_and_upload(BUCKET, LOCAL_FILE, S3_KEY)

#Helper to run Athena SQL and wait
def run_athena_sql(sql, database=ATHENA_DB, output=ATHENA_OUTPUT, wait=True, poll_seconds=1):
    resp = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": database},
        ResultConfiguration={"OutputLocation": output}
    )
    qid = resp["QueryExecutionId"]
    if not wait:
        return qid
    while True:
        status = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            break
        time.sleep(poll_seconds)
    return status, qid

#Creating Athena database and external table (CSV)
print("Creating Athena database and table...")
status, qid = run_athena_sql(f"CREATE DATABASE IF NOT EXISTS {ATHENA_DB};")
print("Create DB status:", status)

ddl = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {ATHENA_DB}.{ATHENA_TABLE} (
  order_id INT,
  customer_id STRING,
  amount DOUBLE,
  order_date STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",",
  "quoteChar"     = '"'
)
LOCATION 's3://{BUCKET}/raw/orders/'
TBLPROPERTIES ("skip.header.line.count"="1");
"""
status, qid = run_athena_sql(ddl)
print("Create table status:", status)

#Query Athena and return pandas DataFrame
def query_athena_to_df(sql, database=ATHENA_DB, output=ATHENA_OUTPUT):
    print("Running query:", sql)
    resp = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": database},
        ResultConfiguration={"OutputLocation": output}
    )
    qid = resp["QueryExecutionId"]
    # wait
    while True:
        r = athena.get_query_execution(QueryExecutionId=qid)
        st = r["QueryExecution"]["Status"]["State"]
        if st in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            break
        time.sleep(1)
    if st != "SUCCEEDED":
        reason = r["QueryExecution"]["Status"].get("StateChangeReason", "")
        raise RuntimeError(f"Athena query {qid} failed: {st}. {reason}")

    # get results
    result = athena.get_query_results(QueryExecutionId=qid)
    cols = [c["Name"] for c in result["ResultSet"]["ResultSetMetadata"]["ColumnInfo"]]
    rows = []
    for row in result["ResultSet"]["Rows"][1:]:
        rows.append([d.get("VarCharValue") for d in row["Data"]])
    df = pd.DataFrame(rows, columns=cols)
    return df

#Read table into DataFrame
df = query_athena_to_df(f"SELECT * FROM {ATHENA_TABLE} LIMIT 100;")
print("Rows returned:", len(df))
display(df)

#Converting types and save
if "amount" in df.columns:
    df["amount"] = df["amount"].astype(float)

OUT_LOCAL = "athena_query_result.csv"
df.to_csv(OUT_LOCAL, index=False)
print("Saved query result to local file:", OUT_LOCAL)

#Uploading processed CSV to S3 processed zone
processed_key = "processed/orders/athena_query_result.csv"
s3.upload_file(OUT_LOCAL, BUCKET, processed_key)
print(f"Uploaded processed CSV to s3://{BUCKET}/{processed_key}")

#bar chart
if "customer_id" in df.columns and "amount" in df.columns:
    plt.figure(figsize=(9,4))
    df.plot(kind="bar", x="customer_id", y="amount", legend=False)
    plt.title("Order Amounts per Customer")
    plt.ylabel("amount")
    plt.tight_layout()
    plt.show()

print("All done. Remember to rotate or delete access keys after testing if these are long-lived keys.")



