# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [14]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkConf
from typing import Dict, List
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import DateType, StringType, FloatType, ByteType, IntegerType, DecimalType
import os


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

# analysing global temparature city file

In [12]:
input_dir = "/workspace/home"
output_dir = "/workspace/home/output"
    

In [18]:
def globalcity_temperature_process(spark, input_dir, output_dir):

    """Create/Recreate globalcity_temperature_process dimension table."""
    table_name = "globalcity_temperature_process"
    csv_path= os.path.join(input_dir, "GlobalLandTemperaturesByCity.csv")
    temp_df = spark.read.option("header", True).csv(csv_path)
    print("Before process -----------")
    print(temp_df.show())
    temp_df = temp_df.dropna()
    temp_df = temp_df.withColumn("year", F.year("dt")).withColumn("month", F.month("dt"))
    temp_df = temp_df.groupBy([F.col("Country").alias("country"), "year", "month"]).agg(
        F.avg("AverageTemperature").alias("average_temperature"),
        F.avg("AverageTemperatureUncertainty").alias("average_temperature_uncertainty"),
    )
    temp_df = temp_df.withColumn("temperature_id", F.monotonically_increasing_id())

    #check_data_quality(temp_df, "temperature_id", table_name)
    output_path = os.path.join(output_dir, table_name)
    print(f"Data for {table_name} table was successfully processed.")
    temp_df.write.mode("overwrite").parquet(output_path)
    temp_df.show()
globalcity_temperature_process(spark, input_dir, output_dir)

Before process -----------
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 

# Analysing U.s demographic cities

In [25]:
def process_demographic(spark: SparkSession, input_dir_path: str, output_dir_path: str) -> None:
    """Create/Recreate demographic dimension table."""
    table_name = "demographic"

    #demog_df = read_demographic_csv(spark, input_dir_path)
    #csv_path = get_file_path(dir_path, "us-cities-demographics.csv")
    #spark_df = spark.read.option("header", True).options(delimiter=";").csv(csv_path)
    csv_path= os.path.join(input_dir, "us-cities-demographics.csv")
    #demog_df = spark.read.option("header", True).csv(csv_path)
    demog_df = spark.read.option("header", True).options(delimiter=";").csv(csv_path)
    print("Before processing -----------------------------")
    demog_df.show()
    demog_df = demog_df.groupBy(F.col("State Code").alias("state_code")).agg(
        F.sum("Total Population").cast(IntegerType()).alias("total_population"),
        F.sum("Male Population").cast(IntegerType()).alias("male_population"),
        F.sum("Female Population").cast(IntegerType()).alias("female_population"),
        F.sum("Number of Veterans").cast(IntegerType()).alias("number_of_veterans"),
        F.sum("Foreign-born").cast(IntegerType()).alias("foregin_born"),
        F.avg("Median Age").cast(FloatType()).alias("median_age"),
        F.avg("Average Household Size").cast(FloatType()).alias("average_household_size"),
    )

    #check_data_quality(demog_df, "state_code", table_name)
    print(f"Data for {table_name} table was successfully processed.")
    #demog_df.limit(5).toPandas()
    #output_path = os.path.join(output_dir, table_name)
    print(f"Data for {table_name} table was successfully processed.")
    #demog_df.write.mode("overwrite").parquet(output_path)
    #demog_df.show()
    return demog_df
demog_df=process_demographic(spark, input_dir, output_dir)
    #write_table_data(demog_df, output_dir_path, table_name)

Before processing -----------------------------
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
| 

## Anlysing Immigration data

In [4]:
immigration_df = spark.read.csv('immigration_data_sample.csv',header=True)

In [5]:
immigration_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [6]:
immigration_df.show(truncate=False)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|_c0    |cicid    |i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum       |fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|4.0   |209.0 |209.0 |HHW    |20566.0|1.0    |HI     |20573.0|61.0  |2.0    |1.0  |20160422|null    |null |G      |O      |null   |M      |1955.0 |07202016|F     |null  |JL     |56582674633.0|00782|WT      |
|2171295|4422636.0|2016.0|4.0   |582.0 |582.0 |MCA    |20567.0|1.0  

In [7]:
immigration_df.count()

1000

In [8]:
immigration_df.dropDuplicates().count()

1000

In [9]:
immigration_df.describe().show(truncate=False)

+-------+-----------------+------------------+------+------+------------------+------------------+-------+-----------------+------------------+-------+------------------+-----------------+------------------+-----+-----------------+--------+-----+-------+-------+-------+-------+-----------------+------------------+------+------------------+-------+--------------------+------------------+--------+
|summary|_c0              |cicid             |i94yr |i94mon|i94cit            |i94res            |i94port|arrdate          |i94mode           |i94addr|depdate           |i94bir           |i94visa           |count|dtadfile         |visapost|occup|entdepa|entdepd|entdepu|matflag|biryear          |dtaddto           |gender|insnum            |airline|admnum              |fltno             |visatype|
+-------+-----------------+------------------+------+------+------------------+------------------+-------+-----------------+------------------+-------+------------------+-----------------+--------------

In [10]:
immigration_df.write.parquet('immigration_data')

In [13]:
spark.read.parquet('immigration_data').count()

1000

##  Analysing global temparature data 

In [5]:
#GlobalLandTemperaturesByCity.csv
temp_df = spark.read.csv('GlobalLandTemperaturesByCity.csv',header=True)

In [6]:
print("Number of rows: ", temp_df.count())
temp_df.limit(5).toPandas()

Number of rows:  600311


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
temp_df.count()

600311

In [9]:

temp_df.dropDuplicates().count()


600311

## Analysing airpots codes

In [10]:
# Read in the data here
airport_df = spark.read \
    .option("header", True) \
    .csv("airport-codes_csv.csv")

In [11]:
print(airport_df.count())
airport_df.limit(5).toPandas()

55075


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [12]:
airport_df.dropDuplicates().count()

55075

## Analysing U.S.Demographic data

In [14]:
demographic_df = spark.read \
    .option("header", True) \
    .options(delimiter=';') \
    .csv("us-cities-demographics.csv")

In [15]:
print("Number of rows: ", demographic_df.count())

Number of rows:  2891


In [16]:
print(demographic_df.count())
airport_df.limit(5).toPandas()


2891


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [27]:
us_states_df = pd.read_html('https://www23.statcan.gc.ca/imdb/p3VD.pl?Function=getVD&TVD=53971')[0]
us_states_df = us_states_df.drop(columns=["Abbreviation", "Code"])
us_states_df = us_states_df.rename(columns={"State": "state", "Alpha code": "state_code"})
us_states_df = spark.createDataFrame(us_states_df)

print("Number of rows: ", us_states_df.count())
us_states_df.limit(5).toPandas()

Number of rows:  51


Unnamed: 0,state,state_code
0,Alabama,AL
1,Alaska,AK
2,Arizona,AZ
3,Arkansas,AR
4,California,CA


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [18]:
# Performing cleaning tasks here

dim_country_temperature_evolution_df = temp_df.dropna()
dim_country_temperature_evolution_df = dim_country_temperature_evolution_df.withColumn("year", F.year("dt")).withColumn("month", F.month("dt"))
dim_country_temperature_evolution_df = dim_country_temperature_evolution_df.groupBy([F.col("Country").alias("country"), "year", "month"]).agg(
    F.avg("AverageTemperature").alias("average_temperature"),
    F.avg("AverageTemperatureUncertainty").alias("average_temperature_uncertainty"),
) 
dim_country_temperature_evolution_df = dim_country_temperature_evolution_df.withColumn('temperature_id', F.monotonically_increasing_id())

print("Number of rows: ", dim_country_temperature_evolution_df.count())
dim_country_temperature_evolution_df.printSchema()
dim_country_temperature_evolution_df.limit(5).toPandas()


Number of rows:  151549
root
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- average_temperature_uncertainty: double (nullable = true)
 |-- temperature_id: long (nullable = false)



Unnamed: 0,country,year,month,average_temperature,average_temperature_uncertainty,temperature_id
0,Denmark,1760,11,3.447,3.927,0
1,Denmark,1765,3,3.12,3.209,1
2,Denmark,1800,6,12.283,2.118,2
3,Denmark,1820,4,6.789,1.467,3
4,Denmark,1832,4,6.936,3.908,4


In [7]:
from pyspark.sql.dataframe import DataFrame

def get_no_of_chars(df_spark: DataFrame) -> pd.DataFrame:
    """Get the minimum and maximum number of characters per string type columns."""
    for column, dtype in df_spark.dtypes:
        if dtype == 'string':
            df_spark = df_spark.withColumn(f"len_{column}", F.length(column))

    min_list = [F.min(column) for column, dtype in df_spark.dtypes if "len_" in column]
    max_list = [F.max(column) for column, dtype in df_spark.dtypes if "len_" in column]
    no_chars_list = min_list + max_list
    return df_spark.select(no_chars_list).limit(len(no_chars_list)).toPandas().T

get_no_of_chars(dim_country_temperature_evolution_df)

NameError: name 'dim_country_temperature_evolution_df' is not defined

In [20]:
dim_demographic_df = demographic_df.groupBy(F.col("State Code").alias("state_code")).agg(
    F.sum("Total Population").cast(IntegerType()).alias("total_population"),
    F.sum("Male Population").cast(IntegerType()).alias("male_population"),
    F.sum("Female Population").cast(IntegerType()).alias("female_population"),
    F.sum("Number of Veterans").cast(IntegerType()).alias("number_of_veterans"),
    F.sum("Foreign-born").cast(IntegerType()).alias("foregin_born"),
    F.avg("Median Age").cast(FloatType()).alias("median_age"),
    F.avg("Average Household Size").cast(FloatType()).alias("average_household_size"),
) 

print("Number of rows: ", dim_demographic_df.count())
dim_demographic_df.printSchema()
dim_demographic_df.limit(5).toPandas()

Number of rows:  49
root
 |-- state_code: string (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foregin_born: integer (nullable = true)
 |-- median_age: float (nullable = true)
 |-- average_household_size: float (nullable = true)



Unnamed: 0,state_code,total_population,male_population,female_population,number_of_veterans,foregin_born,median_age,average_household_size
0,AZ,22497710,11137275,11360435,1322525,3411565,35.037498,2.774375
1,SC,2586976,1265291,1321685,163334,134019,33.825001,2.469583
2,LA,6502975,3134990,3367985,348855,417095,34.625,2.465
3,MN,7044165,3478803,3565362,321738,1069888,35.579628,2.496852
4,NJ,6931024,3423033,3507991,146632,2327750,35.254387,2.960877


In [25]:
dim_us_states_df = us_states_df

print("Number of rows: ", dim_us_states_df.count())
dim_us_states_df.printSchema()
dim_us_states_df.limit(5).toPandas()

NameError: name 'us_states_df' is not defined

In [26]:
from utils.read_data import read_i94_descr
input_dir_path = "raw_data"

visa_category = read_i94_descr("I94VISA", input_dir_path)
dim_visa_df = imm_df.select(
    F.col("visatype").alias("visa_type"),
    F.col("visapost").alias("visa_issuer"),
    F.col("I94VISA").alias("visa_category_code").cast(IntegerType()).cast(StringType()),
).dropDuplicates()
dim_visa_df = dim_visa_df.withColumn("visa_category", F.col("visa_category_code")).replace(visa_category, subset="visa_category")

window_visa = Window.orderBy("visa_type")
dim_visa_df = dim_visa_df.withColumn("visa_id", F.row_number().over(window_visa))

print(dim_visa_df.count())
dim_visa_df.printSchema()
dim_visa_df.limit(5).toPandas()

ModuleNotFoundError: No module named 'utils'

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd

def read_continent_list(spark: SparkSession) -> DataFrame:
    link = "https://www.php.net/manual/en/function.geoip-continent-code-by-name.php"
    pd_df = pd.read_html(link, keep_default_na=False)[0]
    print(pd_df)
    
    schema = StructType([
        StructField("Code", StringType(), True),
        StructField("Continent name", StringType(), True)
    ])
    
    spark_df = spark.createDataFrame(pd_df, schema=schema)
    print(f"Continent list from {link} was successfully read.")
    return spark_df

read_continent_list(spark)

  Code Continent name
0   AF         Africa
1   AN     Antarctica
2   AS           Asia
3   EU         Europe
4   NA  North america
5   OC        Oceania
6   SA  South america
Continent list from https://www.php.net/manual/en/function.geoip-continent-code-by-name.php was successfully read.


DataFrame[Code: string, Continent name: string]

In [8]:
def read_continet_list(spark: SparkSession) -> DataFrame:
    link = "https://www.php.net/manual/en/function.geoip-continent-code-by-name.php"
    pd_df = pd.read_html(link, keep_default_na=False)[0]
    spark_df = spark.createDataFrame(pd_df)
    print(spark_df.show())
    print(f"Continent list from {link} was successfully read.")
    return spark_df
read_continet_list(spark)

+----+--------------+
|Code|Continent name|
+----+--------------+
|  AF|        Africa|
|  AN|    Antarctica|
|  AS|          Asia|
|  EU|        Europe|
|  NA| North america|
|  OC|       Oceania|
|  SA| South america|
+----+--------------+

None
Continent list from https://www.php.net/manual/en/function.geoip-continent-code-by-name.php was successfully read.


DataFrame[Code: string, Continent name: string]

In [4]:
def read_country_list(spark: SparkSession) -> DataFrame:
    """Read the country list from 'https://countrycode.org/'."""
    link = "https://countrycode.org/"
    pd_df = pd.read_html(link)[1]
    pd_df["ISO CODES"] = pd_df["ISO CODES"].apply(lambda value: value.split(" / ")[0])
    pd_df.drop(columns=["COUNTRY CODE"], inplace=True)
    pd_df.rename(
        columns={"COUNTRY": "country", "ISO CODES": "country_code"}, inplace=True
    )

    spark_df = spark.createDataFrame(pd_df)
    print(f"Country list from {link} was successfully read.")
    print(spark_df.show())
    return spark_df
read_country_list(spark)

Country list from https://countrycode.org/ was successfully read.
+-------------------+------------+
|            country|country_code|
+-------------------+------------+
|        Afghanistan|          AF|
|            Albania|          AL|
|            Algeria|          DZ|
|     American Samoa|          AS|
|            Andorra|          AD|
|             Angola|          AO|
|           Anguilla|          AI|
|         Antarctica|          AQ|
|Antigua and Barbuda|          AG|
|          Argentina|          AR|
|            Armenia|          AM|
|              Aruba|          AW|
|          Australia|          AU|
|            Austria|          AT|
|         Azerbaijan|          AZ|
|            Bahamas|          BS|
|            Bahrain|          BH|
|         Bangladesh|          BD|
|           Barbados|          BB|
|            Belarus|          BY|
+-------------------+------------+
only showing top 20 rows

None


DataFrame[country: string, country_code: string]

In [7]:
def read_country_list(spark: SparkSession) -> DataFrame:
    """Read the country list from 'https://countrycode.org/'."""
    link = "https://countrycode.org/"
    pd_df = pd.read_html(link)[1]
    print(pd_df)
    pd_df["ISO CODES"] = pd_df["ISO CODES"].apply(lambda value: value.split(" / ")[0])
    pd_df.drop(columns=["COUNTRY CODE"], inplace=True)
    pd_df.rename(
        columns={"COUNTRY": "country", "ISO CODES": "country_code"}, inplace=True
    )

    spark_df = spark.createDataFrame(pd_df)
    print(f"Country list from {link} was successfully read.")
    print(spark_df.show())
    return spark_df
    #print(spark_df.show())
read_country_list(spark)

                            COUNTRY COUNTRY CODE ISO CODES
0                       Afghanistan           93  AF / AFG
1                           Albania          355  AL / ALB
2                           Algeria          213  DZ / DZA
3                    American Samoa        1-684  AS / ASM
4                           Andorra          376  AD / AND
5                            Angola          244  AO / AGO
6                          Anguilla        1-264  AI / AIA
7                        Antarctica          672  AQ / ATA
8               Antigua and Barbuda        1-268  AG / ATG
9                         Argentina           54  AR / ARG
10                          Armenia          374  AM / ARM
11                            Aruba          297  AW / ABW
12                        Australia           61  AU / AUS
13                          Austria           43  AT / AUT
14                       Azerbaijan          994  AZ / AZE
15                          Bahamas        1-242  BS / B

DataFrame[country: string, country_code: string]

In [10]:
import boto3

# Create an S3 client
s3 = boto3.client('s3')

# Specify your AWS access key and secret access key
access_key = ''
secret_key = ''

# Set the credentials for the S3 client
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

In [12]:
import boto3
import os

def upload_parquet_files_to_s3(local_directory, s3_bucket):

    access_key = ''
    secret_key = ''

    s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key
    )
    
    for root, dirs, files in os.walk(local_directory):
        for file in files:
            if file.endswith('.parquet'):
                local_path = os.path.join(root, file)
                s3_path = os.path.join(s3_bucket, file)
                s3.upload_file(local_path, s3_bucket, s3_path)
                print(f"Uploaded {local_path} to S3 bucket {s3_bucket} at {s3_path}")

# Usage example
local_directory = 'path_to_save_parquet_file.parquet'
s3_bucket = 'capstoneglobaltemp'

upload_parquet_files_to_s3(local_directory, s3_bucket)

Uploaded path_to_save_parquet_file.parquet/part-00074-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00074-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00192-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00192-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00121-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00121-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00073-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00073-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00038-9d82d212-f51e-

Uploaded path_to_save_parquet_file.parquet/part-00085-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00085-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00166-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00166-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00027-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00027-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00131-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00131-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00078-9d82d212-f51e-

Uploaded path_to_save_parquet_file.parquet/part-00063-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00063-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00149-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00149-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00143-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00143-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00159-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00159-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00075-9d82d212-f51e-

Uploaded path_to_save_parquet_file.parquet/part-00065-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00065-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00080-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00080-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00031-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00031-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00162-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00162-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00071-9d82d212-f51e-

Uploaded path_to_save_parquet_file.parquet/part-00175-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00175-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00046-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00046-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00099-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00099-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00146-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00146-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00041-9d82d212-f51e-

Uploaded path_to_save_parquet_file.parquet/part-00024-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00024-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00120-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00120-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00062-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00062-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00136-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet to S3 bucket capstoneglobaltemp at capstoneglobaltemp/part-00136-9d82d212-f51e-4a86-bc40-4a410a409507-c000.snappy.parquet
Uploaded path_to_save_parquet_file.parquet/part-00039-9d82d212-f51e-

In [None]:
# 