In [72]:
# # read the log file in output folder and generate a csv file
# import os
# import sys
# import csv
# import re
# import datetime

# # Specify the path to the log file
# # find the log file in the output folder
# output_folder = "output"
# log_files = []
# for root, dirs, files in os.walk(output_folder):
#     for file in files:
#         if ".inprogress" in file:
#             log_files.append(os.path.join(root, file))
# if len(log_files) == 0:
#     print("No log file found in the output folder")
#     sys.exit(1)
# # get the latest log file
# log_file_path = log_files[0]
# for log_file in log_files:
#     if os.path.getmtime(log_file) > os.path.getmtime(log_file_path):
#         log_file_path = log_file
# print("log file path: ", log_file_path)

# # Specify the path to the output CSV file
# csv_file_path = "output/log.csv"

# # Open the log file for reading
# with open(log_file_path, "r") as log_file:
#     # Read the contents of the log file
#     log_contents = log_file.read().splitlines()

# # Open the CSV file for writing
# with open(csv_file_path, "w", newline="") as csv_file:
#     # Create a CSV writer object
#     csv_writer = csv.writer(csv_file)

#     # Write the header row
#     csv_writer.writerow(["c_custkey", "c_name", "c_acctbal", "c_address", "n_name", "c_phone", "c_comment", "deltaRevenue", "revenue"])
    
#     # Write each line in the log file into the CSV file
#     # change the delimeter from "|" to ","
#     for line in log_contents:
#         # Split the line into fields
#         fields = line.split("|")
#         # Write the fields into the CSV file
#         csv_writer.writerow(fields)

In [65]:
import pandas as pd

# read the mysql query result
mysql_csv_path = "mysql_test/queryResult_mysql.csv"
flink_csv_path = "output/log.csv"

# read csv file
mysql_df = pd.read_csv(mysql_csv_path)
flink_df = pd.read_csv(flink_csv_path)

mysql_df = mysql_df[["c_custkey", "revenue"]]
flink_df = flink_df[["c_custkey", "revenue"]]

join_df = pd.merge(mysql_df, flink_df, on="c_custkey", how="inner", suffixes=("_mysql", "_flink"))
join_df["diff"] = join_df["revenue_mysql"] - join_df["revenue_flink"]
consistent = join_df[join_df["diff"] == 0]
inconsistent = join_df[join_df["diff"] != 0]

all_custkey = mysql_df["c_custkey"].unique()
missing_custkey = [custkey for custkey in all_custkey if custkey not in join_df["c_custkey"].unique()]
consistent_custkey = consistent["c_custkey"].unique()
inconsistent_custkey = [custkey for custkey in all_custkey if custkey not in consistent_custkey]

In [66]:
# calculate ratio of consistent and inconsistent
total_custkey = len(all_custkey)
consistent_ratio = len(consistent_custkey) / total_custkey
inconsistent_ratio = len(inconsistent_custkey) / total_custkey
missing_ratio = len(missing_custkey) / total_custkey
print("consistent ratio: ", consistent_ratio)
print("inconsistent ratio: ", inconsistent_ratio)
print("missing ratio: ", missing_ratio)


consistent ratio:  0.7117794486215538
inconsistent ratio:  0.2882205513784461
missing ratio:  0.09774436090225563


In [71]:
diff_less_than_10 = join_df[join_df["diff"]/join_df['revenue_mysql'].abs() < 0.01]
diff_less_than_10

Unnamed: 0,c_custkey,revenue_mysql,revenue_flink,diff
9,679,378211.3252,378211.3252,0.0
18,1201,374331.5340,374331.5340,0.0
25,422,366451.0126,366451.0126,0.0
32,334,360370.7550,360370.7550,0.0
40,805,359448.9036,359448.9036,0.0
...,...,...,...,...
1144,368,5544.4850,5544.4850,0.0
1145,931,5288.0544,5288.0544,0.0
1146,823,5092.6080,5092.6080,0.0
1147,733,1691.8360,1691.8360,0.0


In [75]:
import os
import sys
import csv
import re
import datetime

# folder = "src/main/resources/data"
folder = "tpch_datasets/data_sf0.8"

# file needs update operations (in dependent order)
files = ['nation.tbl', 'customer.tbl', 'orders.tbl', 'lineitem.tbl']

fieldnames = {
    "nation": ["N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"],
    "customer": ["C_CUSTKEY", "C_NAME", "C_ADDRESS", "C_NATIONKEY", "C_PHONE", "C_ACCTBAL", "C_MKTSEGMENT", "C_COMMENT"],
    "orders": ["O_ORDERKEY", "O_CUSTKEY", "O_ORDERSTATUS", "O_TOTALPRICE", "O_ORDERDATE", "O_ORDERPRIORITY", "O_CLERK", "O_SHIPPRIORITY", "O_COMMENT"],
    "lineitem": ["L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY", "L_LINENUMBER", "L_QUANTITY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_TAX", "L_RETURNFLAG", "L_LINESTATUS", "L_SHIPDATE", "L_COMMITDATE", "L_RECEIPTDATE", "L_SHIPINSTRUCT", "L_SHIPMODE", "L_COMMENT"]
}

for file in files:
    # Open the raw data file for reading
    with open(os.path.join(folder, file), "r") as raw_data_file:
        # Read the contents of the log file
        file_contents = raw_data_file.read().splitlines()

    # Open the CSV file for writing
    with open(folder+"/"+file.replace(".tbl",".csv"), "w", newline="") as csv_file:
        # Create a CSV writer object
        csv_writer = csv.writer(csv_file)

        # Write the header row
        csv_writer.writerow(fieldnames[file.split(".")[0]]) # remove the file extension
        
        # Write each line in the log file into the CSV file
        # change the delimeter from "|" to ","
        for line in file_contents:
            # Split the line into fields
            fields = line.split("|")
            # Write the fields into the CSV file
            csv_writer.writerow(fields)


In [79]:
import pandas as pd

# 对比sf0.8和sf1.0的数据，看是否一致
# folder = "tpch_datasets/data_sf1.0"
files = ['nation.csv', 'customer.csv', 'orders.csv', 'lineitem.csv']
fieldnames = {
    "nation": ["N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"],
    "customer": ["C_CUSTKEY", "C_NAME", "C_ADDRESS", "C_NATIONKEY", "C_PHONE", "C_ACCTBAL", "C_MKTSEGMENT", "C_COMMENT"],
    "orders": ["O_ORDERKEY", "O_CUSTKEY", "O_ORDERSTATUS", "O_TOTALPRICE", "O_ORDERDATE", "O_ORDERPRIORITY", "O_CLERK", "O_SHIPPRIORITY", "O_COMMENT"],
    "lineitem": ["L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY", "L_LINENUMBER", "L_QUANTITY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_TAX", "L_RETURNFLAG", "L_LINESTATUS", "L_SHIPDATE", "L_COMMITDATE", "L_RECEIPTDATE", "L_SHIPINSTRUCT", "L_SHIPMODE", "L_COMMENT"]
}

for file in files:
    table_s08 = pd.read_csv(f"tpch_datasets/data_sf0.8/{file}")
    table_s10 = pd.read_csv(f"tpch_datasets/data_sf1.0/{file}")
    
    pk = fieldnames[file.split(".")[0]][0]
    compare_field = fieldnames[file.split(".")[0]][1]
    join_df = pd.merge(table_s08, table_s10, on=pk, how="inner", suffixes=("_sf08", "_sf10"))
    
    join_df["diff"] = join_df[compare_field+"_sf08"] == join_df[compare_field+"_sf10"]
    consistent = join_df[join_df["diff"] == True]
    inconsistent = join_df[join_df["diff"] == False]
    
    print(file)
    print("consistent: ", len(consistent))
    print("inconsistent: ", len(inconsistent))
    print("missing: ", len(table_s10) - len(consistent) - len(inconsistent))


nation.csv
consistent:  25
inconsistent:  0
missing:  0
customer.csv
consistent:  120000
inconsistent:  0
missing:  30000
orders.csv
consistent:  9506076
inconsistent:  10492239
missing:  -18498315


KeyboardInterrupt: 