### Step 1: Import Packages

In [1]:
import gzip
from google.cloud import bigquery
from google.cloud import bigquery_storage
import os
import pandas as pd
import json
import warnings
import datetime as dt
import re
warnings.filterwarnings(action="ignore")

### Step 2: Open the gzip files

In [2]:
list_gzip_files = [pos_json for pos_json in os.listdir(os.getcwd() + "/vendor_clustering_gzips") if pos_json.endswith(".gzip")]

df_s3_list = []
for i in list_gzip_files:
    with gzip.open(filename=os.path.join(os.getcwd() + "/vendor_clustering_gzips", i), mode="r") as f:
        country_code = i[0:2] # i[0:2] extracts the country code

        # Read the contents of a gzip file
        vars()["json_" + country_code] = f.read()
        f.close()
        
        # Change the bytes object into a JSON and put it in a dataframe
        vars()["df_country_" + country_code] = pd.DataFrame(json.loads(vars()["json_" + country_code].decode("utf-8")))

        # Add a column to the dataframe with the country_code
        vars()["df_country_" + country_code]["country_code"] = country_code
        df_s3_list.append(vars()["df_country_" + country_code])

        # Add a column to the data frame indicating the type of the strategy
        vars()["df_country_" + country_code]["cluster"] = re.findall(pattern="(?<=.._..-)(.*)(?=-v1)", string=i)[0]

# Combining the generated data frames into one
df_s3 = pd.concat(df_s3_list)

# Convert the "update_timestamp" column from epoch seconds to a readable timestamp
df_s3["update_timestamp"] = df_s3["update_timestamp"].apply(lambda x: dt.datetime.fromtimestamp(int(x)).strftime('%Y-%m-%d %H:%M:%S'))

### Step 3: Query the LB BQ table

In [3]:
client = bigquery.Client(project="logistics-customer-staging")
query = """
    SELECT *
    FROM `logistics-data-storage-staging.long_term_pricing.vendors_clustered`
"""
df_bq = client.query(query=query).result().to_dataframe(bqstorage_client=bigquery_storage.BigQueryReadClient(), progress_bar_type="tqdm")

Downloading: 100%|[32m██████████[0m|


In [4]:
def bq_dataframes_func(dataframe):
    df_check = pd.merge(
        left=dataframe,
        right=df_bq[["country_code", "vendor_id", "cluster"]],
        left_on=["country_code", "vendor_ids", "cluster"],
        right_on=["country_code", "vendor_id", "cluster"],
        how="inner",
    )
    return df_check

dataframe_country_list = [i for i in dir() if i.startswith("df_country_")]

df_check_list = []
for i in dataframe_country_list:
    country_code = i[-2:]
    vars()["df_" + country_code + "_check"] = bq_dataframes_func(dataframe=eval(i))
    df_check_list.append(vars()["df_" + country_code + "_check"])

df_check = pd.concat(df_check_list)

In [6]:
df_s3.groupby(["country_code", "cluster"], as_index=False)["vendor_ids"].count()

Unnamed: 0,country_code,cluster,vendor_ids
0,ae,AFV_Distance_High_Basket_High_Distance,3325
1,ae,AFV_Distance_High_Basket_Low_Distance,2182
2,ae,AFV_Distance_Insufficient_Data,9011
3,ae,AFV_Distance_Low_Basket_High_Distance,3109
4,ae,AFV_Distance_Low_Basket_Low_Distance,4706
...,...,...,...
273,ve,AFV_Distance_Insufficient_Data,1238
274,ve,AFV_Distance_Low_Basket_High_Distance,252
275,ve,AFV_Distance_Low_Basket_Low_Distance,277
276,ve,AFV_Distance_Mid_Basket_Mid_Distance,213


In [7]:
df_bq[df_bq["region"] == "Americas"].groupby(["country_code", "cluster"], as_index=False)["vendor_id"].count()

Unnamed: 0,country_code,cluster,vendor_id
0,ar,AFV_Distance_High_Basket_High_Distance,5223
1,ar,AFV_Distance_High_Basket_Low_Distance,5101
2,ar,AFV_Distance_Insufficient_Data,13961
3,ar,AFV_Distance_Low_Basket_High_Distance,5752
4,ar,AFV_Distance_Low_Basket_Low_Distance,7886
...,...,...,...
85,ve,AFV_Distance_High_Basket_Low_Distance,221
86,ve,AFV_Distance_Insufficient_Data,1238
87,ve,AFV_Distance_Low_Basket_High_Distance,252
88,ve,AFV_Distance_Low_Basket_Low_Distance,277


In [8]:
df_check.groupby(["country_code", "cluster"], as_index=False)[["vendor_id", "vendor_ids"]].count()

Unnamed: 0,country_code,cluster,vendor_id,vendor_ids
0,ae,AFV_Distance_Mid_Basket_Mid_Distance,3986,3986
1,ar,AFV_Distance_Mid_Basket_Mid_Distance,6689,6689
2,at,AFV_Distance_Mid_Basket_Mid_Distance,433,433
3,bd,AFV_Distance_Mid_Basket_Mid_Distance,1399,1399
4,bh,AFV_Distance_Mid_Basket_Mid_Distance,723,723
5,bo,AFV_Distance_Mid_Basket_Mid_Distance,944,944
6,cl,AFV_Distance_Mid_Basket_Mid_Distance,2549,2549
7,cr,AFV_Distance_Mid_Basket_Mid_Distance,417,417
8,cy,AFV_Distance_Mid_Basket_Mid_Distance,321,321
9,cz,AFV_Distance_Mid_Basket_Mid_Distance,792,792


In [9]:
df_check.groupby(["country_code", "cluster"], as_index=False).apply(lambda x: x.notnull().count())

Unnamed: 0,vendor_ids,update_timestamp,country_code,cluster,vendor_id
0,3986,3986,3986,3986,3986
1,6689,6689,6689,6689,6689
2,433,433,433,433,433
3,1399,1399,1399,1399,1399
4,723,723,723,723,723
5,944,944,944,944,944
6,2549,2549,2549,2549,2549
7,417,417,417,417,417
8,321,321,321,321,321
9,792,792,792,792,792


In [10]:
df_check["vendor_id"].isnull().value_counts() # vendor_code is the field coming from bigquery. If it is null, this means that there is a mismatch

vendor_id
False    431499
Name: count, dtype: int64

In [11]:
df_check["vendor_ids"].isnull().value_counts() # vendor_ids is the field coming from S3. If it is null, this means that there is a mismatch

vendor_ids
False    431499
Name: count, dtype: int64