Step 1: Schema Matching and Mapping

Let's start with Step 2.1 (Schema Matching).
Step 2.1: Schema Matching (String Matching Techniques)

The first task is to identify similarities between the different schema attributes. WE can use string matching techniques like Levenshtein distance and Jaccard similarity to find naming similarities.

Levenshtein Distance measures how many single-character edits are needed to change one word into the other. It works well for detecting minor spelling differences in attribute names.

Jaccard Similarity is a measure of similarity between two sets, used here to compare attribute names by treating them as sets of characters or tokens.

In [14]:
#fix and use
import pandas as pd
import numpy as np
from Levenshtein import distance as levenshtein_distance
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Load the merged CSV files into Pandas DataFrames
files = {
    "mysql": "merged/merged_mysql.csv",
    "postgres": "merged/merged_postgres.csv",
    "web": "merged/merged_web.csv",
    "xml": "merged/merged_xml.csv",
    "cassandra": "merged/merged_cassandra.csv",
    "mongo": "merged/merged_mongo.csv"
}

dataframes = {key: pd.read_csv(
    value, encoding='utf-8', low_memory=False) for key, value in files.items()}

# Extract column names from each dataframe
schemas = {key: list(df.columns) for key, df in dataframes.items()}

# Flatten schema into a list of (source, column_name)
schema_list = [(source, col)
               for source, columns in schemas.items() for col in columns]
column_names = [col for _, col in schema_list]

# Compute pairwise Levenshtein distance between column names


def compute_levenshtein_similarity(col1, col2):
    return 1 - (levenshtein_distance(col1, col2) / max(len(col1), len(col2)))


levenshtein_scores = np.zeros((len(column_names), len(column_names)))

for i in range(len(column_names)):
    for j in range(len(column_names)):
        levenshtein_scores[i, j] = compute_levenshtein_similarity(
            column_names[i], column_names[j])

# Compute Jaccard similarity between column names


def jaccard_similarity(str1, str2):
    set1, set2 = set(str1.lower()), set(str2.lower())
    return len(set1 & set2) / len(set1 | set2)


jaccard_scores = np.zeros((len(column_names), len(column_names)))

for i in range(len(column_names)):
    for j in range(len(column_names)):
        jaccard_scores[i, j] = jaccard_similarity(
            column_names[i], column_names[j])

# Compute TF-IDF cosine similarity
vectorizer = TfidfVectorizer().fit_transform(column_names)
cosine_sim_matrix = cosine_similarity(vectorizer, vectorizer)

# Combine all similarity metrics (weighted sum)
final_similarity_scores = (levenshtein_scores * 0.4) + \
    (jaccard_scores * 0.3) + (cosine_sim_matrix * 0.3)

# Create a DataFrame for similarity results
similarity_df = pd.DataFrame(
    final_similarity_scores, index=column_names, columns=column_names)

# Print highly similar column pairs (threshold > 0.7)
pairs = []
threshold = 0.7
for i in range(len(column_names)):
    for j in range(i + 1, len(column_names)):
        if final_similarity_scores[i, j] > threshold:
            pairs.append(
                (column_names[i], column_names[j], final_similarity_scores[i, j]))

similar_columns = pd.DataFrame(
    pairs, columns=["Column 1", "Column 2", "Similarity Score"])
print(similar_columns.sort_values(by="Similarity Score", ascending=False))

             Column 1           Column 2  Similarity Score
3           billingid          billingid          1.000000
10             Status             Status          1.000000
7                name               name          1.000000
4         billingdate        billingdate          1.000000
15            Address            Address          1.000000
19     policy_details     policy_details          1.000000
13               Name               Name          1.000000
14            Address            address          0.942857
17            address            Address          0.942857
11             Status             status          0.933333
16             Status             status          0.933333
1   contractstartdate  ContractStartDate          0.929412
2     contractenddate    ContractEndDate          0.920000
12               Name               name          0.900000
6                name               Name          0.900000
9            areatype           AreaType          0.9000

Step 2.2: Schema Mapping (GAV, LAV, GLAV)

Once we’ve matched schemas using string similarity, we’ll need to define mappings between the local schemas and a global schema. For the sake of this example, let's assume we're going to define a simple global schema.
GAV (Global-as-View):

For this, we define a global schema as views over source schemas and transform data to fit the global schema.

In [82]:
import pandas as pd
import numpy as np
from fuzzywuzzy import fuzz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Load merged CSV files
mysql_df = pd.read_csv('merged/merged_mysql.csv')
postgres_df = pd.read_csv('merged/merged_postgres.csv')
cassandra_df = pd.read_csv('merged/merged_cassandra.csv')
mongo_df = pd.read_csv('merged/merged_mongo.csv')
xml_df = pd.read_csv('merged/merged_xml.csv')
web_df = pd.read_csv('merged/merged_web.csv')
# Print column names for debugging
print("MySQL Columns:", mysql_df.columns.tolist())
print("PostgreSQL Columns:", postgres_df.columns.tolist())
print("Cassandra Columns:", cassandra_df.columns.tolist())
print("MongoDB Columns:", mongo_df.columns.tolist())
print("XML Columns:", xml_df.columns.tolist())
print("Web Columns:", web_df.columns.tolist())

# Extract column names
schemas = {
    "MySQL": mysql_df.columns,
    "PostgreSQL": postgres_df.columns,
    "Cassandra": cassandra_df.columns,
    "MongoDB": mongo_df.columns,
    "XML": xml_df.columns,
    "Web": web_df.columns
}

# Function to compute similarity scores


def compute_similarity(col1, col2):
    lev_score = fuzz.ratio(col1, col2)
    jac_score = len(set(col1).intersection(set(col2))) / \
        len(set(col1).union(set(col2)))
    return lev_score, jac_score


# Schema Matching
matched_schemas = []
for db1, cols1 in schemas.items():
    for db2, cols2 in schemas.items():
        if db1 != db2:
            for col1 in cols1:
                for col2 in cols2:
                    lev_score, jac_score = compute_similarity(col1, col2)
                    if lev_score > 80 or jac_score > 0.5:
                        matched_schemas.append(
                            (db1, col1, db2, col2, lev_score, jac_score))

# Convert to DataFrame
matched_df = pd.DataFrame(matched_schemas, columns=[
                          "DB1", "Column1", "DB2", "Column2", "Levenshtein", "Jaccard"])
print(matched_df.head())

# Define a mediated global schema
global_schema = {
    "CustomerID": ["customerid", "clientid", "ClientID", "client_id", "_id"],
    "CustomerName": ["customername", "name", "Name"],
    "Contact": ["mobileinfo", "contact_number", "ContactInfo"],
    "Address": ["fulladdress", "Address", "address"],
    "ContractID": ["contractid", "ContractID", "contract_id"],
    "ContractStartDate": ["contractstartdate", "ContractStartDate", "start_date"],
    "ContractEndDate": ["contractenddate", "ContractEndDate", "end_date"],
    "BillingAmount": ["totalamount", "amountdue", "paymentstatus"],
    "Status": ["statusid", "status", "statusname", "classification"],
    "AreaType": ["areatype", "AreaType"],
    "Region": ["region", "Region"],  # Ensuring Region exists
    "WaterResourceID": ["sourceid", "ReservoirID"],
    "Waste": ["wastetype", "quantityinkg_x", "quantityinkg_y", "WasteQuantity", "WasteType"],
    "CustomerType": ["ConsumerType", "customer_type"],
    "Turbidity": ["turbidity", "Turbidity"],
    "phlevel": ["pH"],
    "Reservoir": ["Location"],
    "Reason": ["reasondescription"]
}
# GAV Mapping (Global-as-View)


def GAV_query():
    return "SELECT CustomerID, CustomerName, Contact, Address FROM GlobalSchema WHERE Status='Active'"


def GAV_query2():
    return "SELECT ContractID, ContractStartDate, ContractEndDate FROM GlobalSchema WHERE BillingAmount > 1000"


def GAV_query3():
    return "SELECT CustomerID, BillingAmount FROM GlobalSchema ORDER BY BillingAmount DESC"


gav_queries = [GAV_query(), GAV_query2(), GAV_query3()]
print("GAV Queries:", gav_queries)

# LAV Mapping (Local-as-View)


def LAV_query():
    return "SELECT clientid as CustomerID, name as CustomerName, mobileinfo as Contact FROM PostgreSQL WHERE areatype='Residential'"


def LAV_query2():
    return "SELECT contract_id AS ContractID, start_date AS ContractStartDate FROM MySQL WHERE region='South'"


def LAV_query3():
    return "SELECT _id AS CustomerID, address AS Address FROM MongoDB WHERE customer_type='Premium'"


def unfold_LAV(query):
    # Example function to translate global schema query into source-specific queries
    return query.replace("CustomerID", "clientid").replace("CustomerName", "name").replace("Contact", "mobileinfo")


lav_queries = [unfold_LAV(LAV_query()), unfold_LAV(
    LAV_query2()), unfold_LAV(LAV_query3())]
print("LAV Queries:", lav_queries)

# GLAV Mapping (Global-Local-As-View)


def GLAV_query():
    return "SELECT c.clientid AS CustomerID, c.name AS CustomerName, c.contact_number AS Contact FROM Cassandra c JOIN MongoDB m ON c.client_id = m._id"


def GLAV_query2():
    return "SELECT p.clientid AS CustomerID, p.name AS CustomerName FROM PostgreSQL p JOIN MySQL m ON p.clientid = m.client_id"


def GLAV_query3():
    return "SELECT x.id AS CustomerID, x.address AS Address FROM XML x JOIN Web w ON x.id = w.user_id"


glav_queries = [GLAV_query(), GLAV_query2(), GLAV_query3()]
print("GLAV Queries:", glav_queries)

# Query Processing and Optimization (Example)


def optimize_query(query):
    return query + " /* Optimized Execution Plan */"


optimized_queries = [optimize_query(
    q) for q in gav_queries + lav_queries + glav_queries]
print("Optimized Queries:", optimized_queries)

# Wrapper functions for query translation


def sql_to_nosql(sql_query):
    return sql_query.replace("SELECT", "db.find({").replace("FROM", "})")


def jsonpath_to_sql(json_query):
    return json_query.replace("$.", "SELECT * FROM")


translated_nosql_queries = [sql_to_nosql(q) for q in gav_queries]
translated_sql_queries = [jsonpath_to_sql(q) for q in glav_queries]
print("Translated NoSQL Queries:", translated_nosql_queries)
print("Translated SQL Queries:", translated_sql_queries)

MySQL Columns: ['customerid', 'customername', 'contractid', 'statusid', 'contractstartdate', 'contractenddate', 'statusname', 'policyid', 'policyname', 'policyinfo', 'publicationdate', 'effectivedate', 'fineid', 'reasonid', 'reasondescription', 'usageid', 'departmentid', 'sourceid', 'usagedate', 'volumeused', 'purpose', 'departmentname', 'equipmentid', 'equipmentname', 'lastinspectiondate', 'logid', 'maintenancedate', 'partsreplaced', 'techniciannotes', 'billingid', 'billingdate', 'amountdue', 'paymentstatus']
PostgreSQL Columns: ['clientid', 'name', 'fulladdress', 'postcode', 'areatypeid', 'mobileinfo', 'areatype', 'description_x', 'billingid', 'billingdate', 'totalamount', 'billingdetailid', 'wastecategoryid', 'quantityinkg_x', 'subtotal', 'wastetype', 'unitpriceperkg', 'description_y', 'frequencyid', 'lastcollectiondate', 'nextcollectiondate', 'collectionfrequency', 'description', 'disposalid', 'disposaldate', 'quantityinkg_y']
Cassandra Columns: ['client_id', 'address', 'area_type_

  mysql_df = pd.read_csv('merged/merged_mysql.csv')


In [80]:
# fix and use
import pandas as pd
from fuzzywuzzy import fuzz

# Load CSVs
mysql_df = pd.read_csv('merged/merged_mysql.csv')
postgres_df = pd.read_csv('merged/merged_postgres.csv')
cassandra_df = pd.read_csv('merged/merged_cassandra.csv')
mongo_df = pd.read_csv('merged/merged_mongo.csv')
xml_df = pd.read_csv('merged/merged_xml.csv')
web_df = pd.read_csv('merged/merged_web.csv')

# Print column names for debugging
print("MySQL Columns:", mysql_df.columns.tolist())
print("PostgreSQL Columns:", postgres_df.columns.tolist())
print("Cassandra Columns:", cassandra_df.columns.tolist())
print("MongoDB Columns:", mongo_df.columns.tolist())
print("XML Columns:", xml_df.columns.tolist())
print("Web Columns:", web_df.columns.tolist())

# Updated Global Schema Mapping (with correct column names)
global_schema = {
    "CustomerID": ["customerid", "clientid", "ClientID", "client_id", "_id"],
    "CustomerName": ["customername", "name", "Name"],
    "Contact": ["mobileinfo", "contact_number", "ContactInfo"],
    "Address": ["fulladdress", "Address", "address"],
    "ContractID": ["contractid", "ContractID", "contract_id"],
    "ContractStartDate": ["contractstartdate", "ContractStartDate", "start_date"],
    "ContractEndDate": ["contractenddate", "ContractEndDate", "end_date"],
    "BillingAmount": ["totalamount", "amountdue", "paymentstatus"],
    "Status": ["statusid", "status", "statusname", "classification"],
    "AreaType": ["areatype", "AreaType"],
    "Region": ["region", "Region"],  # Ensuring Region exists
    "WaterResourceID": ["sourceid", "ReservoirID"],
    "Waste": ["wastetype","quantityinkg_x", "quantityinkg_y", "WasteQuantity", "WasteType"],
    "CustomerType": ["ConsumerType", "customer_type"],
    "Turbidity": ["turbidity", "Turbidity"],
    "phlevel": ["pH"],
    "Reservoir":["Location"],
    "Reason": ["reasondescription"]
}

# Function to map global schema fields to actual column names in CSV


def get_actual_column(df, global_column):
    for possible_col in global_schema.get(global_column, []):
        if possible_col in df.columns:
            return possible_col
    return None  # If no match is found

# Function to execute queries on CSV DataFrames


def execute_query(df, select_cols, where_condition=None, order_by=None):
    try:
        print(f"\nAvailable columns in DataFrame: {df.columns.tolist()}")

        # Map global schema columns to actual columns
        selected_cols = [get_actual_column(
            df, col) for col in select_cols if get_actual_column(df, col)]

        if not selected_cols:
            print("No valid columns found for selection.")
            return pd.DataFrame()

        # Filter DataFrame
        if where_condition:
            col, value = where_condition
            actual_col = get_actual_column(df, col)
            if actual_col:
                print(f"Filtering on {actual_col} for value: {value}")
                df[actual_col] = df[actual_col].astype(
                    str).str.lower()  # Normalize case
                df = df[df[actual_col] == str(value).lower()]
            else:
                print(f"Warning: Column {col} not found for filtering. Skipping filter.")

        # Order By
        if order_by:
            actual_col = get_actual_column(df, order_by)
            if actual_col:
                df = df.sort_values(by=actual_col, ascending=False)
            else:
                print(f"Warning: Column {order_by} not found for ordering. Skipping order by.")

        # Select only required columns
        df = df[selected_cols]

        return df.head()  # Return first 5 results for preview

    except Exception as e:
        print(f"Error executing query: {e}")
        return pd.DataFrame()


# Simulate GAV queries with correct column names
print("\nExecuting GAV Queries:")
print(execute_query(postgres_df, ["CustomerName", "AreaType"],
      ("Waste", "Organic")))
print(execute_query(mysql_df, [
      "ContractID", "ContractStartDate", "ContractEndDate"], ("BillingAmount", "75.5")))
print(execute_query(postgres_df, ["CustomerName",
      "AreaType", "BillingAmount"], order_by="BillingAmount"))

# Simulate LAV queries with corrected filters
print("\nExecuting LAV Queries:")
print(execute_query(postgres_df, [
      "CustomerID", "CustomerName", "Contact","Waste"], ("AreaType", "residential")))
print(execute_query(mysql_df, ["CustomerName",
      "ContractID", "ContractStartDate"], ('Reason', 'Late Payment')))
""" print(execute_query(web_df, ["WaterResourceID","Turbidity", "phlevel",], ("Reservoir", "Reservoir_1"))) """


def execute_lav_query(df, group_by_col, agg_col, agg_func="mean"):
    try:
        actual_group_by_col = get_actual_column(df, group_by_col)
        actual_agg_col = get_actual_column(df, agg_col)

        if actual_group_by_col and actual_agg_col:
            result = df.groupby(actual_group_by_col)[
                actual_agg_col].agg(agg_func).reset_index()
            return result
        else:
            print(f"Warning: Column {group_by_col} or {agg_col} not found.")
            return pd.DataFrame()

    except Exception as e:
        print(f"Error executing LAV query: {e}")
        return pd.DataFrame()


# Execute LAV query for average Turbidity per Reservoir
print("\nExecuting LAV Query for Average Turbidity per Reservoir:")
print(execute_lav_query(web_df, "Reservoir", "Turbidity", "mean"))
# Simulate GLAV queries using pandas merge (join) with corrected column names
print("\nExecuting GLAV Queries:")

""" # Fix GLAV1 Join
glav1 = pd.merge(
    cassandra_df,
    mongo_df,
    left_on=get_actual_column(cassandra_df, "CustomerID"),
    right_on=get_actual_column(mongo_df, "CustomerID"),
    how="inner"
)

print("GLAV1 Debug Columns:", glav1.columns.tolist())
actual_cols1 = [get_actual_column(glav1, col) for col in [
    "CustomerID", "CustomerName", "Contact"] if get_actual_column(glav1, col)]
if actual_cols1:
    print(glav1[actual_cols1].head())
else:
    print("GLAV1: No matching columns found.") """

# Fix GLAV2 Join
glav2 = pd.merge(
    postgres_df,
    mysql_df,
    left_on=get_actual_column(postgres_df, "CustomerID"),
    right_on=get_actual_column(mysql_df, "CustomerID"),
    how="inner"
)

print("GLAV2 Debug Columns:", glav2.columns.tolist())
actual_cols2 = [get_actual_column(glav2, col) for col in [
    "CustomerID", "CustomerName"] if get_actual_column(glav2, col)]
if actual_cols2:
    print(glav2[actual_cols2].head())
else:
    print("GLAV2: No matching columns found.")
""" # Debugging: Print Available Columns Before Merging
print("\nXML DataFrame Columns:", xml_df.columns.tolist())
print("Web DataFrame Columns:", web_df.columns.tolist()) """

""" # Find actual column names before merging
xml_customer_col = get_actual_column(xml_df, "CustomerID")
web_customer_col = get_actual_column(web_df, "CustomerID")

# Check if both columns exist before merging
if xml_customer_col and web_customer_col:
    glav3 = pd.merge(xml_df, web_df, left_on=xml_customer_col,
                     right_on=web_customer_col, how="inner")
    print("GLAV3 Debug Columns:", glav3.columns.tolist())

    # Select columns dynamically
    actual_cols3 = [get_actual_column(glav3, col) for col in [
        "CustomerID", "Address"] if get_actual_column(glav3, col)]
    if actual_cols3:
        print(glav3[actual_cols3].head())
    else:
        print("GLAV3: No matching columns found.")
else:
    print("\n GLAV3 Merge Skipped: Could not find common columns in XML and Web DataFrames!")
    print(f"XML Customer Column: {xml_customer_col}, Web Customer Column: {web_customer_col}") """

MySQL Columns: ['customerid', 'customername', 'contractid', 'statusid', 'contractstartdate', 'contractenddate', 'statusname', 'policyid', 'policyname', 'policyinfo', 'publicationdate', 'effectivedate', 'fineid', 'reasonid', 'reasondescription', 'usageid', 'departmentid', 'sourceid', 'usagedate', 'volumeused', 'purpose', 'departmentname', 'equipmentid', 'equipmentname', 'lastinspectiondate', 'logid', 'maintenancedate', 'partsreplaced', 'techniciannotes', 'billingid', 'billingdate', 'amountdue', 'paymentstatus']
PostgreSQL Columns: ['clientid', 'name', 'fulladdress', 'postcode', 'areatypeid', 'mobileinfo', 'areatype', 'description_x', 'billingid', 'billingdate', 'totalamount', 'billingdetailid', 'wastecategoryid', 'quantityinkg_x', 'subtotal', 'wastetype', 'unitpriceperkg', 'description_y', 'frequencyid', 'lastcollectiondate', 'nextcollectiondate', 'collectionfrequency', 'description', 'disposalid', 'disposaldate', 'quantityinkg_y']
Cassandra Columns: ['client_id', 'address', 'area_type_

  mysql_df = pd.read_csv('merged/merged_mysql.csv')


' # Find actual column names before merging\nxml_customer_col = get_actual_column(xml_df, "CustomerID")\nweb_customer_col = get_actual_column(web_df, "CustomerID")\n\n# Check if both columns exist before merging\nif xml_customer_col and web_customer_col:\n    glav3 = pd.merge(xml_df, web_df, left_on=xml_customer_col,\n                     right_on=web_customer_col, how="inner")\n    print("GLAV3 Debug Columns:", glav3.columns.tolist())\n\n    # Select columns dynamically\n    actual_cols3 = [get_actual_column(glav3, col) for col in [\n        "CustomerID", "Address"] if get_actual_column(glav3, col)]\n    if actual_cols3:\n        print(glav3[actual_cols3].head())\n    else:\n        print("GLAV3: No matching columns found.")\nelse:\n    print("\n GLAV3 Merge Skipped: Could not find common columns in XML and Web DataFrames!")\n    print(f"XML Customer Column: {xml_customer_col}, Web Customer Column: {web_customer_col}") '

In [47]:
glav2.head()

Unnamed: 0,clientid,name,fulladdress,postcode,areatypeid,mobileinfo,areatype,description_x,billingid_x,billingdate_x,...,equipmentname,lastinspectiondate,logid,maintenancedate,partsreplaced,techniciannotes,billingid_y,billingdate_y,amountdue,paymentstatus
0,1,Customer_1,"rua 1, Faro",801-00001,1,35191000001,residential,Areas primarily for housing and residential pu...,1,2025-01-31,...,Treatment Equipment 1,2020-02-05,1,2020-02-07,Part_2,Technician note 1,1.0,2020-03-06,75.5,0.0
1,1,Customer_1,"rua 1, Faro",801-00001,1,35191000001,residential,Areas primarily for housing and residential pu...,1,2025-01-31,...,Treatment Equipment 1,2020-02-05,51,2020-07-06,Part_2,Technician note 51,1.0,2020-03-06,75.5,0.0
2,1,Customer_1,"rua 1, Faro",801-00001,1,35191000001,residential,Areas primarily for housing and residential pu...,1,2025-01-31,...,Treatment Equipment 1,2020-02-05,101,2020-12-03,Part_2,Technician note 101,1.0,2020-03-06,75.5,0.0
3,1,Customer_1,"rua 1, Faro",801-00001,1,35191000001,residential,Areas primarily for housing and residential pu...,1,2025-01-31,...,Treatment Equipment 1,2020-02-05,151,2020-05-02,Part_2,Technician note 151,1.0,2020-03-06,75.5,0.0
4,1,Customer_1,"rua 1, Faro",801-00001,1,35191000001,residential,Areas primarily for housing and residential pu...,1,2025-01-31,...,Treatment Equipment 6,2020-02-10,6,2020-02-22,Part_2,Technician note 6,1.0,2020-03-06,75.5,0.0
