In [1]:
import os
import shutil
from datetime import datetime

import pandas as pd
import awswrangler as wr
from mojap_metadata import Metadata
from arrow_pd_parser import reader
from typing import  Dict, Union, Optional

In [5]:
import pyarrow.parquet as pq
from arrow_pd_parser import reader

def load_partitioned_parquets(s3_path, partitions=None):
    """
    Load Parquet files iteratively from an S3 bucket when the file names contain 'part'.
    
    Args:
        s3_path (str): The S3 path to the directory containing the Parquet files.
        partitions (list, optional): A list of partition values to filter files.
    
    Returns:
        pandas.DataFrame: A Pandas DataFrame containing the combined data from all files.
    """
    # Create an empty list to store the data
    data_frames = []
    
    # Create a ParquetDataset from the S3 path
    dataset = pq.ParquetDataset(s3_path, filesystem=reader.s3_fs)
    
    # Iterate over the Parquet file pieces
    for piece in dataset.pieces:
        # Check if the file path contains 'part'
        if 'part' in piece.path:
            # If partitions are provided, filter files based on partition values
            if partitions and any(partition not in piece.path for partition in partitions):
                continue
            
            # Load the Parquet file into a DataFrame
            df = reader.parquet.read(piece.path)
            data_frames.append(df)
    
    # Concatenate all DataFrames into a single DataFrame
    combined_df = pd.concat(data_frames, ignore_index=True)
    
    return combined_df

In [None]:
# Load all Parquet files with 'part' in the name from the S3 path
combined_df = load_partitioned_parquets("s3://your-bucket-name/path/to/parquet/data")

In [None]:
# Load Parquet files with 'part' in the name and filter by partition values
partitions = ['partition1', 'partition2']
combined_df = load_partitioned_parquets("s3://your-bucket-name/path/to/parquet/data", partitions=partitions)

In [2]:
def load_data_from_s3(LOCAL=True, s3_url: Optional[str] = None):
    if LOCAL:
        current_dir = os.getcwd()
        relative_path = os.path.join("data", "people-1000.csv")
        #s3_path = os.path.join(current_dir, relative_path)
        s3_path = "C:/Users/damil/Documents/Data_Engr/airflow-de-intro-project-main/data/people-1000.csv"
    else:
        if s3_url is None:
            raise ValueError("s3_url must be provided when LOCAL=False")
        s3_path = s3_url

    df = reader.read(s3_path)
    return df

In [6]:
url = "s3://dami-test-bucket786567/de-intro/land/people-part1.parquet"
reader.parquet.read(url)


OSError: When getting information for key 'de-intro/land/people-part1.parquet' in bucket 'dami-test-bucket786567': AWS Error ACCESS_DENIED during HeadObject operation: No response body.

In [3]:
dff = load_data_from_s3()
dff

Unnamed: 0,Index,User Id,First Name,Last Name,Sex,Email,Phone,Date of birth,Job Title
0,1,8717bbf45cCDbEe,Shelia,Mahoney,Male,pwarner@example.org,857.139.8239,2014-01-27,Probation officer
1,2,3d5AD30A4cD38ed,Jo,Rivers,Female,fergusonkatherine@example.net,+1-950-759-8687,1931-07-26,Dancer
2,3,810Ce0F276Badec,Sheryl,Lowery,Female,fhoward@example.org,(599)782-0605,2013-11-25,Copy
3,4,BF2a889C00f0cE1,Whitney,Hooper,Male,zjohnston@example.com,+1-939-130-6258,2012-11-17,Counselling psychologist
4,5,9afFEafAe1CBBB9,Lindsey,Rice,Female,elin@example.net,(390)417-1635x3010,1923-04-15,Biomedical engineer
...,...,...,...,...,...,...,...,...,...
995,996,fedF4c7Fd9e7cFa,Kurt,Bryant,Female,lyonsdaisy@example.net,021.775.2933,1959-01-05,Personnel officer
996,997,ECddaFEDdEc4FAB,Donna,Barry,Female,dariusbryan@example.com,001-149-710-7799x721,2001-10-06,Education administrator
997,998,2adde51d8B8979E,Cathy,Mckinney,Female,georgechan@example.org,+1-750-774-4128x33265,1918-05-13,Commercial/residential surveyor
998,999,Fb2FE369D1E171A,Jermaine,Phelps,Male,wanda04@example.net,(915)292-2254,1971-08-31,Ambulance person


In [4]:
def load_metadata() -> Metadata:
    #metadata = 'data\metadata\intro-project-metadata.json'
    metadata =  "C:/Users/damil/Documents/Data_Engr/airflow-de-intro-project-main/data/metadata/people.json"
    metadata = Metadata.from_json(metadata)
    return metadata

def update_metadata() -> Metadata:
    
    # Load the metadata schema
    metadata = load_metadata()
    # Add new columns to the metadata

    metadata.update_column(
        {"name":"mojap_start_datetime", "type":"string"})
    metadata.update_column({"name":"mojap_image_tag", "type":"string"})
    metadata.update_column(
        {"name":"mojap_raw_filename", "type":"string"})
    metadata.update_column(
        {"name":"mojap_task_timestamp", "type":"string"})
    return metadata

In [5]:
def update_metadata():  
    metadata = load_metadata()

    new_columns = [
        {"name": "mojap_start_datetime", "type": "timestamp(s)", 
         "datetime_format": "%Y-%m-%dT%H:%M:%S", "description": "extraction start date"},
        {"name": "mojap_image_tag", "type": "string", "description": "image version"},
        {"name": "mojap_raw_filename", "type": "string", "description": ""},
        {"name": "mojap_task_timestamp", "type": "timestamp(s)", 
         "datetime_format": "%Y-%m-%dT%H:%M:%S", "description": ""}
    ]
    
    for new_column in new_columns:
        metadata.update_column(new_column)
    
    return metadata

In [6]:
def cast_columns_to_correct_types(df):
    metadata = update_metadata()
    
    for column in metadata.columns:
        column_name = column['name']
        column_type = column['type']
        
        # handle potential instances of missing/new columns
        if column_name not in df.columns:
            if column_type == 'timestamp(s)':
                df[column_name] = pd.NaT 
            elif column_type == 'string':
                df[column_name] = ''
            else:
                df[column_name] = pd.NA 
                
        # Now cast the column to the correct type
        if column_type == 'timestamp(s)':
            df[column_name] = pd.to_datetime(df[column_name], format=column.get('datetime_format', "%Y-%m-%dT%H:%M:%S"))
        else:
            df[column_name] = df[column_name].astype(column_type)
    return df

In [7]:
metadata = update_metadata()
#metadata = load_metadata()

In [8]:
dff

Unnamed: 0,Index,User Id,First Name,Last Name,Sex,Email,Phone,Date of birth,Job Title
0,1,8717bbf45cCDbEe,Shelia,Mahoney,Male,pwarner@example.org,857.139.8239,2014-01-27,Probation officer
1,2,3d5AD30A4cD38ed,Jo,Rivers,Female,fergusonkatherine@example.net,+1-950-759-8687,1931-07-26,Dancer
2,3,810Ce0F276Badec,Sheryl,Lowery,Female,fhoward@example.org,(599)782-0605,2013-11-25,Copy
3,4,BF2a889C00f0cE1,Whitney,Hooper,Male,zjohnston@example.com,+1-939-130-6258,2012-11-17,Counselling psychologist
4,5,9afFEafAe1CBBB9,Lindsey,Rice,Female,elin@example.net,(390)417-1635x3010,1923-04-15,Biomedical engineer
...,...,...,...,...,...,...,...,...,...
995,996,fedF4c7Fd9e7cFa,Kurt,Bryant,Female,lyonsdaisy@example.net,021.775.2933,1959-01-05,Personnel officer
996,997,ECddaFEDdEc4FAB,Donna,Barry,Female,dariusbryan@example.com,001-149-710-7799x721,2001-10-06,Education administrator
997,998,2adde51d8B8979E,Cathy,Mckinney,Female,georgechan@example.org,+1-750-774-4128x33265,1918-05-13,Commercial/residential surveyor
998,999,Fb2FE369D1E171A,Jermaine,Phelps,Male,wanda04@example.net,(915)292-2254,1971-08-31,Ambulance person


In [9]:
for column in metadata.columns:
    print(column['name'], column['type'])

User Id string
First Name string
Last Name string
Sex string
Email string
Phone string
Date of birth timestamp(s)
Job Title string
Source extraction date timestamp(s)
mojap_start_datetime timestamp(s)
mojap_image_tag string
mojap_raw_filename string
mojap_task_timestamp timestamp(s)


In [10]:
cast_columns_to_correct_types(dff)

Unnamed: 0,Index,User Id,First Name,Last Name,Sex,Email,Phone,Date of birth,Job Title,Source extraction date,mojap_start_datetime,mojap_image_tag,mojap_raw_filename,mojap_task_timestamp
0,1,8717bbf45cCDbEe,Shelia,Mahoney,Male,pwarner@example.org,857.139.8239,2014-01-27,Probation officer,NaT,NaT,,,NaT
1,2,3d5AD30A4cD38ed,Jo,Rivers,Female,fergusonkatherine@example.net,+1-950-759-8687,1931-07-26,Dancer,NaT,NaT,,,NaT
2,3,810Ce0F276Badec,Sheryl,Lowery,Female,fhoward@example.org,(599)782-0605,2013-11-25,Copy,NaT,NaT,,,NaT
3,4,BF2a889C00f0cE1,Whitney,Hooper,Male,zjohnston@example.com,+1-939-130-6258,2012-11-17,Counselling psychologist,NaT,NaT,,,NaT
4,5,9afFEafAe1CBBB9,Lindsey,Rice,Female,elin@example.net,(390)417-1635x3010,1923-04-15,Biomedical engineer,NaT,NaT,,,NaT
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,996,fedF4c7Fd9e7cFa,Kurt,Bryant,Female,lyonsdaisy@example.net,021.775.2933,1959-01-05,Personnel officer,NaT,NaT,,,NaT
996,997,ECddaFEDdEc4FAB,Donna,Barry,Female,dariusbryan@example.com,001-149-710-7799x721,2001-10-06,Education administrator,NaT,NaT,,,NaT
997,998,2adde51d8B8979E,Cathy,Mckinney,Female,georgechan@example.org,+1-750-774-4128x33265,1918-05-13,Commercial/residential surveyor,NaT,NaT,,,NaT
998,999,Fb2FE369D1E171A,Jermaine,Phelps,Male,wanda04@example.net,(915)292-2254,1971-08-31,Ambulance person,NaT,NaT,,,NaT


In [11]:
def add_mojap_columns_to_dataframe(df):

    df["mojap_start_datetime"] = pd.to_datetime(df["Source extraction date"])
    df["mojap_image_tag"] = os.environ.get("AIRFLOW_IMAGE_TAG", "unknown")
    df["mojap_raw_filename"] = "people-100000.csv"
    df["mojap_task_timestamp"] = pd.to_datetime(
        os.environ.get("AIRFLOW_TASK_TIMESTAMP", datetime.now())
    )

    return df

In [12]:
df1 = add_mojap_columns_to_dataframe(dff)
df1

Unnamed: 0,Index,User Id,First Name,Last Name,Sex,Email,Phone,Date of birth,Job Title,Source extraction date,mojap_start_datetime,mojap_image_tag,mojap_raw_filename,mojap_task_timestamp
0,1,8717bbf45cCDbEe,Shelia,Mahoney,Male,pwarner@example.org,857.139.8239,2014-01-27,Probation officer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
1,2,3d5AD30A4cD38ed,Jo,Rivers,Female,fergusonkatherine@example.net,+1-950-759-8687,1931-07-26,Dancer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
2,3,810Ce0F276Badec,Sheryl,Lowery,Female,fhoward@example.org,(599)782-0605,2013-11-25,Copy,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
3,4,BF2a889C00f0cE1,Whitney,Hooper,Male,zjohnston@example.com,+1-939-130-6258,2012-11-17,Counselling psychologist,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
4,5,9afFEafAe1CBBB9,Lindsey,Rice,Female,elin@example.net,(390)417-1635x3010,1923-04-15,Biomedical engineer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,996,fedF4c7Fd9e7cFa,Kurt,Bryant,Female,lyonsdaisy@example.net,021.775.2933,1959-01-05,Personnel officer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
996,997,ECddaFEDdEc4FAB,Donna,Barry,Female,dariusbryan@example.com,001-149-710-7799x721,2001-10-06,Education administrator,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
997,998,2adde51d8B8979E,Cathy,Mckinney,Female,georgechan@example.org,+1-750-774-4128x33265,1918-05-13,Commercial/residential surveyor,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
998,999,Fb2FE369D1E171A,Jermaine,Phelps,Male,wanda04@example.net,(915)292-2254,1971-08-31,Ambulance person,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439


In [13]:
cast_columns_to_correct_types(df1)

Unnamed: 0,Index,User Id,First Name,Last Name,Sex,Email,Phone,Date of birth,Job Title,Source extraction date,mojap_start_datetime,mojap_image_tag,mojap_raw_filename,mojap_task_timestamp
0,1,8717bbf45cCDbEe,Shelia,Mahoney,Male,pwarner@example.org,857.139.8239,2014-01-27,Probation officer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
1,2,3d5AD30A4cD38ed,Jo,Rivers,Female,fergusonkatherine@example.net,+1-950-759-8687,1931-07-26,Dancer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
2,3,810Ce0F276Badec,Sheryl,Lowery,Female,fhoward@example.org,(599)782-0605,2013-11-25,Copy,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
3,4,BF2a889C00f0cE1,Whitney,Hooper,Male,zjohnston@example.com,+1-939-130-6258,2012-11-17,Counselling psychologist,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
4,5,9afFEafAe1CBBB9,Lindsey,Rice,Female,elin@example.net,(390)417-1635x3010,1923-04-15,Biomedical engineer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,996,fedF4c7Fd9e7cFa,Kurt,Bryant,Female,lyonsdaisy@example.net,021.775.2933,1959-01-05,Personnel officer,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
996,997,ECddaFEDdEc4FAB,Donna,Barry,Female,dariusbryan@example.com,001-149-710-7799x721,2001-10-06,Education administrator,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
997,998,2adde51d8B8979E,Cathy,Mckinney,Female,georgechan@example.org,+1-750-774-4128x33265,1918-05-13,Commercial/residential surveyor,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439
998,999,Fb2FE369D1E171A,Jermaine,Phelps,Male,wanda04@example.net,(915)292-2254,1971-08-31,Ambulance person,NaT,NaT,unknown,people-100000.csv,2024-03-29 15:18:35.601439


In [14]:
config.py

SCHEMA = 'dami-test'
MOJAP_EXTRACTION_TS = 

SyntaxError: invalid syntax (3940289545.py, line 4)

In [None]:
filename = "{schema}.{tablename}-{time}-part-{part}.parquet".format(
            schema=config.SCHEMA,
            tablename=table,
            time=config.MOJAP_EXTRACTION_TS,
        )

In [None]:
Index             Int64
User Id          string
First Name       string
Last Name        string
Sex              string
Email            string
Phone            string
Date of birth    string
Job Title        string