<a href="https://colab.research.google.com/github/kiran-kusuma/A-Distributed-Computation-on-shared-resouces/blob/main/colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import pandas as pd
import json

# Specify the folder path containing the CSV files
folder_path = '/content/sample_data'

# Create a dictionary to store columns and their types for each CSV
tables_info = {}

# Create a dictionary to store relationships between files and columns
relationships = {}

# Create a dictionary to store the metadata in JSON format
database_metadata = {}

# Step 1: Identify potential relationships first
for filename in os.listdir(folder_path):
    full_file_path = os.path.join(folder_path, filename)

    # Check if it's a CSV file (based on the extension)
    if os.path.isfile(full_file_path) and filename.endswith('.csv'):
        print(f"Processing file: {filename}")
        # Load the CSV file using pandas
        df = pd.read_csv(full_file_path, encoding='latin-1')  # or 'utf-16', 'ISO-8859-1', etc.

        # Get column names and types
        columns_and_types = df.dtypes.to_dict()

        # Store the table name, columns, and their data types
        tables_info[filename] = columns_and_types

# Compare columns across all files to find common columns
for file1, columns1 in tables_info.items():
    for file2, columns2 in tables_info.items():
        if file1 != file2:
            common_columns = set(columns1).intersection(columns2)

            if common_columns:
                for common_col in common_columns:
                    # Load the data for the common column
                    df1 = pd.read_csv(os.path.join(folder_path, file1), engine='python')
                    df2 = pd.read_csv(os.path.join(folder_path, file2), engine='python')

                    # Check if any values in the common column match between the two files
                    matching_values = set(df1[common_col]).intersection(set(df2[common_col]))

                    if matching_values:
                        # Store the relationship for later use in the schema
                        if file1 not in relationships:
                            relationships[file1] = {}
                        if file2 not in relationships:
                            relationships[file2] = {}

                        relationships[file1][common_col] = (file2, common_col)
                        relationships[file2][common_col] = (file1, common_col)

# Step 2: Build the JSON metadata
for filename, columns_and_types in tables_info.items():
    table_metadata = {
        "columns": []
    }

    for column, dtype in columns_and_types.items():
        column_metadata = {
            "name": column,
            "type": str(dtype)
        }

        # Check if there is a relationship for this column
        if column in relationships.get(filename, {}):
            related_table, related_column = relationships[filename][column]
            column_metadata["references"] = {
                "table": related_table,
                "column": related_column
            }

        table_metadata["columns"].append(column_metadata)

    # Add the table metadata to the database
    database_metadata[filename] = table_metadata

# Save the metadata to a JSON file
json_schema_file_path = '/content/database_metadata.json'
with open(json_schema_file_path, 'w') as json_file:
    json.dump(database_metadata, json_file, indent=4)

print(f"Metadata has been saved to {json_schema_file_path}")


Processing file: globalwindmastsinventorydata_nov_9_2020.csv
Processing file: california_housing_train.csv
Metadata has been saved to /content/database_metadata.json


In [None]:
df = pd.read_csv(full_file_path, encoding='latin-1')  # or 'utf-16', 'ISO-8859-1', etc.

In [None]:
import pandas as pd
import os
from pathlib import Path
import numpy as np
import json

def shard_database(input_folder, output_folder, shard_count):
    # Step 1: Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Step 2: Load all tables (CSV files) into a list
    tables = []
    table_names = []
    table_row_counts = {}

    # Load all tables and calculate total rows for each table
    for file_path in Path(input_folder).glob('*.csv'):
        df = pd.read_csv(file_path)
        table_name = file_path.stem
        tables.append(df)
        table_names.append(table_name)
        table_row_counts[table_name] = len(df)

    print(table_row_counts)

    # Step 3: Calculate rows per shard
    k = len(table_names)
    N_values = [1] * k  # Initial distribution of rows
    N = shard_count  # Total number of shards

    # While the sum of N_values is not equal to N, increment the one that maximizes the objective
    current_sum = sum(N_values)
    while current_sum < N:
        # Calculate the current objective values for each table
        obj_values = [table_row_counts[table_names[i]] / N_values[i] for i in range(k)]

        # Find the index with the maximum value of n[i] / N_values[i]
        max_index = np.argmax(obj_values)

        # Increment the corresponding N_value
        N_values[max_index] += 1
        current_sum += 1  # Update the total sum
    print(N_values)

    # Step 4: Initialize variables for row distribution across shards
    shard_data = {i: [] for i in range(shard_count)}  # To store data for each shard
    table_shard_allocation = {}
    table_metadata = {}  # Dictionary to store metadata for each table

    # Step 5: Distribute rows into shards (with careful allocation)
    current_shard_index = 0  # Initialize shard index to 0 for the first table
    for i, table in enumerate(tables):
        table_name = table_names[i]
        table_row_count = table_row_counts[table_name]

        # Calculate how many rows to allocate to each shard for this table
        rows_for_this_table = table_row_counts[table_name]
        rows_per_shard_for_table = rows_for_this_table / N_values[i]
        remaining_rows_for_table = rows_for_this_table % N_values[i]

        # Store allocation information
        table_shard_allocation[table_name] = {
            'N_value': N_values[i],
            'rows_per_shard': [int(rows_per_shard_for_table)] * N_values[i]
        }

        # Track shard index ranges for metadata
        table_metadata[table_name] = {
            'total_rows': table_row_count,
            'total_shards': N_values[i],
            'shards': []
        }

        # Allocate rows to each shard based on N_values for the table
        current_row_start = 0
        for j in range(N_values[i]):  # For the N_values of this table
            # Check if we're at the last shard, allocate remaining rows if necessary
            rows_to_allocate = int(rows_per_shard_for_table)  # Integer number of rows to allocate

            # If there are remaining rows, distribute them one by one to the shards
            if j < remaining_rows_for_table:
                rows_to_allocate += 1  # Allocate one more row to this shard

            # Calculate row range for this shard
            current_row_end = current_row_start + rows_to_allocate - 1
            table_metadata[table_name]['shards'].append({
                'shard_index': current_shard_index,
                'row_range': f"{current_row_start}-{current_row_end}",
                'total_rows': rows_to_allocate
            })

            # Update the start row for the next shard
            current_row_start = current_row_end + 1

            # Add rows to the current shard
            shard_data[current_shard_index].append((table_name, rows_to_allocate))
            print(f"Table: {table_name} | Shard {current_shard_index} | Rows Allocated: {rows_to_allocate}")

            # Move to the next shard index
            current_shard_index += 1
            if current_shard_index == shard_count:
                break

    # Step 6: Save sharded data into separate files
    for shard_index, shard_rows in shard_data.items():
        # For each shard, we will collect all rows assigned to it and store them in a DataFrame
        shard_rows_data = []

        for table_name, rows_to_allocate in shard_rows:
            # Get the rows for the specific table and add them to the list
            table_df = tables[table_names.index(table_name)]
            rows = table_df.head(rows_to_allocate)  # Take only the required number of rows
            shard_rows_data.append(rows)

        # Concatenate all the rows from all tables into a single DataFrame for the shard
        shard_df = pd.concat(shard_rows_data, ignore_index=True)

        # Save the DataFrame to a CSV file for the shard
        shard_file_path = os.path.join(output_folder, f'shard_{shard_index}.csv')
        shard_df.to_csv(shard_file_path, index=False)
        print(f"Shard {shard_index} saved to {shard_file_path}")

    # Step 7: Write the metadata file (JSON)
    metadata_file_path = os.path.join(output_folder, 'shard_metadata.json')

    # Save metadata to JSON
    with open(metadata_file_path, 'w') as json_file:
        json.dump(table_metadata, json_file, indent=4)

    print(f"Metadata saved to {metadata_file_path}")


# Example usage
input_folder = '/content/sample_data'  # Folder where original CSV tables are located
output_folder = '/content/sharded_database'  # Folder where sharded tables will be saved
shard_count = 20  # Number of shards to split the tables into

shard_database(input_folder=input_folder, output_folder=output_folder, shard_count=shard_count)


{'WDIfootnote': 788559, 'WDIcountry-series': 8237, 'WDICSV': 397936, 'WDICountry': 265, 'WDIseries-time': 148, 'WDISeries': 1453}
[10, 1, 6, 1, 1, 1]
Table: WDIfootnote | Shard 0 | Rows Allocated: 78856
Table: WDIfootnote | Shard 1 | Rows Allocated: 78856
Table: WDIfootnote | Shard 2 | Rows Allocated: 78856
Table: WDIfootnote | Shard 3 | Rows Allocated: 78856
Table: WDIfootnote | Shard 4 | Rows Allocated: 78856
Table: WDIfootnote | Shard 5 | Rows Allocated: 78856
Table: WDIfootnote | Shard 6 | Rows Allocated: 78856
Table: WDIfootnote | Shard 7 | Rows Allocated: 78856
Table: WDIfootnote | Shard 8 | Rows Allocated: 78856
Table: WDIfootnote | Shard 9 | Rows Allocated: 78855
Table: WDIcountry-series | Shard 10 | Rows Allocated: 8237
Table: WDICSV | Shard 11 | Rows Allocated: 66323
Table: WDICSV | Shard 12 | Rows Allocated: 66323
Table: WDICSV | Shard 13 | Rows Allocated: 66323
Table: WDICSV | Shard 14 | Rows Allocated: 66323
Table: WDICSV | Shard 15 | Rows Allocated: 66322
Table: WDICSV | 

In [None]:
import pandas as pd
import json
import os
from pathlib import Path
from typing import List, Dict, Any, Union
import sqlparse
from sqlparse.sql import Where, Comparison, Identifier, Token
import re

class DistributedQueryEngine:
    def __init__(self, database_path: str, metadata_path: str, shard_metadata_path: str):
        """
        Initialize the distributed query engine.

        Args:
            database_path: Path to the folder containing sharded CSV files
            metadata_path: Path to the database schema metadata JSON file
            shard_metadata_path: Path to the shard distribution metadata JSON file
        """
        self.database_path = Path(database_path)
        self.schema = self._load_json(metadata_path)
        self.shard_metadata = self._load_json(shard_metadata_path)

    def _load_json(self, path: str) -> Dict:
        """Load and parse a JSON file."""
        with open(path, 'r') as f:
            return json.load(f)

    def _parse_where_clause(self, where_clause: Where) -> Dict[str, Any]:
        """Parse WHERE clause to extract conditions."""
        conditions = {}
        if where_clause:
            for token in where_clause.tokens:
                if isinstance(token, Comparison):
                    # Extract column name and value from comparison
                    left = str(token.left).strip()
                    right = str(token.right).strip().strip("'").strip('"')
                    operator = str(token.token_next(0)[1])
                    conditions[left] = {'value': right, 'operator': operator}
        return conditions

    def _identify_relevant_shards(self, table_name: str, conditions: Dict) -> List[int]:
        """Identify which shards need to be queried based on conditions."""
        if table_name not in self.shard_metadata:
            raise ValueError(f"Table {table_name} not found in shard metadata")

        # For now, return all shards - this could be optimized based on conditions
        return [shard['shard_index'] for shard in self.shard_metadata[table_name]['shards']]

    def _read_shard(self, shard_index: int) -> pd.DataFrame:
        """Read a specific shard file."""
        shard_path = self.database_path / f'shard_{shard_index}.csv'
        return pd.read_csv(shard_path)

    def _apply_conditions(self, df: pd.DataFrame, conditions: Dict) -> pd.DataFrame:
        """Apply WHERE conditions to the DataFrame."""
        for column, condition in conditions.items():
            operator = condition['operator']
            value = condition['value']

            if operator == '=':
                df = df[df[column] == value]
            elif operator == '>':
                df = df[df[column] > float(value)]
            elif operator == '<':
                df = df[df[column] < float(value)]
            elif operator == '>=':
                df = df[df[column] >= float(value)]
            elif operator == '<=':
                df = df[df[column] <= float(value)]

        return df

    def execute_query(self, query: str) -> pd.DataFrame:
        """
        Execute a simplified SQL query across the distributed database.

        Currently supports basic SELECT queries with WHERE clauses.

        Args:
            query: SQL query string (simplified format)

        Returns:
            pandas DataFrame with query results
        """
        # Parse the SQL query
        parsed = sqlparse.parse(query)[0]

        # Extract table name
        from_seen = False
        table_name = None
        for token in parsed.tokens:
            if from_seen and isinstance(token, Identifier):
                table_name = str(token).strip()
                break
            if str(token).upper() == 'FROM':
                from_seen = True

        if not table_name:
            raise ValueError("Could not identify table name in query")

        # Extract WHERE conditions if present
        where_clause = None
        for token in parsed.tokens:
            if isinstance(token, Where):
                where_clause = token
                break

        conditions = self._parse_where_clause(where_clause) if where_clause else {}

        # Identify relevant shards
        relevant_shards = self._identify_relevant_shards(table_name, conditions)

        # Execute query across shards and combine results
        results = []
        for shard_index in relevant_shards:
            shard_df = self._read_shard(shard_index)

            # Apply conditions
            if conditions:
                shard_df = self._apply_conditions(shard_df, conditions)

            results.append(shard_df)

        # Combine results
        if results:
            final_result = pd.concat(results, ignore_index=True)

            # Extract selected columns
            select_columns = []
            select_seen = False
            for token in parsed.tokens:
                if select_seen and isinstance(token, Identifier):
                    select_columns.extend([col.strip() for col in str(token).split(',')])
                if str(token).upper() == 'SELECT':
                    select_seen = True
                elif str(token).upper() == 'FROM':
                    break

            if select_columns and select_columns != ['*']:
                final_result = final_result[select_columns]

            return final_result
        else:
            return pd.DataFrame()

    def get_table_schema(self, table_name: str) -> Dict:
        """Get the schema information for a specific table."""
        if table_name not in self.schema:
            raise ValueError(f"Table {table_name} not found in schema")
        return self.schema[table_name]

# Example usage:
if __name__ == "__main__":
    # Initialize the query engine
    engine = DistributedQueryEngine(
        database_path="/content/sharded_database",
        metadata_path="/content/database_metadata.json",
        shard_metadata_path="/content/sharded_database/shard_metadata.json"
    )

    # Example queries
    queries = [
        "SELECT * FROM WDICountry.csv WHERE Country_Code = 'USA'"
    ]

    # Execute example queries
    for query in queries:
        try:
            print(f"\nExecuting query: {query}")
            result = engine.execute_query(query)
            print(f"Results shape: {result.shape}")
            print(result.head())
        except Exception as e:
            print(f"Error executing query: {str(e)}")


Executing query: SELECT * FROM WDICountry WHERE Country Code = 'USA'
Error executing query: 'Code'


In [None]:
import pandas as pd
import json
import os
from pathlib import Path
from typing import List, Dict, Any, Union
import sqlparse
from sqlparse.sql import Where, Comparison, Identifier, Token
import re

class DistributedQueryEngine:
    def __init__(self, database_path: str, metadata_path: str, shard_metadata_path: str):
        """
        Initialize the distributed query engine.

        Args:
            database_path: Path to the folder containing sharded CSV files
            metadata_path: Path to the database schema metadata JSON file
            shard_metadata_path: Path to the shard distribution metadata JSON file
        """
        self.database_path = Path(database_path)
        self.schema = self._load_json(metadata_path)
        self.shard_metadata = self._load_json(shard_metadata_path)

    def _load_json(self, path: str) -> Dict:
        """Load and parse a JSON file."""
        with open(path, 'r') as f:
            return json.load(f)

    def _normalize_column_name(self, column_name: str) -> str:
        """Normalize column names (e.g., replace spaces with underscores)."""
        return column_name.replace("_", " ")

    def _parse_where_clause(self, where_clause: Where) -> Dict[str, Any]:
        """Parse WHERE clause to extract conditions."""
        conditions = {}
        if where_clause:
            for token in where_clause.tokens:
                if isinstance(token, Comparison):
                    # Extract column name and value from comparison
                    left = str(token.left).strip()
                    right = str(token.right).strip().strip("'").strip('"')
                    operator = str(token.token_next(0)[1])

                    # Normalize column name
                    left_normalized = self._normalize_column_name(left)

                    conditions[left_normalized] = {'value': right, 'operator': operator}
        return conditions

    def _identify_relevant_shards(self, table_name: str, conditions: Dict) -> List[int]:
        """Identify which shards need to be queried based on conditions."""
        if table_name not in self.shard_metadata:
            raise ValueError(f"Table {table_name} not found in shard metadata")

        # For now, return all shards - this could be optimized based on conditions
        return [shard['shard_index'] for shard in self.shard_metadata[table_name]['shards']]

    def _read_shard(self, shard_index: int) -> pd.DataFrame:
        """Read a specific shard file."""
        shard_path = self.database_path / f'shard_{shard_index}.csv'
        return pd.read_csv(shard_path)

    def _apply_conditions(self, df: pd.DataFrame, conditions: Dict) -> pd.DataFrame:
        """Apply WHERE conditions to the DataFrame."""
        for column, condition in conditions.items():
            operator = condition['operator']
            value = condition['value']

            # Normalize column name for condition
            column_normalized = self._normalize_column_name(column)

            if operator == '=':
                df = df[df[column_normalized] == value]
            elif operator == '>':
                df = df[df[column_normalized] > float(value)]
            elif operator == '<':
                df = df[df[column_normalized] < float(value)]
            elif operator == '>=':
                df = df[df[column_normalized] >= float(value)]
            elif operator == '<=':
                df = df[df[column_normalized] <= float(value)]

        return df

    def execute_query(self, query: str) -> pd.DataFrame:
        """
        Execute a simplified SQL query across the distributed database.

        Currently supports basic SELECT queries with WHERE clauses.

        Args:
            query: SQL query string (simplified format)

        Returns:
            pandas DataFrame with query results
        """
        # Parse the SQL query
        parsed = sqlparse.parse(query)[0]

        # Extract table name
        from_seen = False
        table_name = None
        for token in parsed.tokens:
            if from_seen and isinstance(token, Identifier):
                table_name = str(token).strip()
                break
            if str(token).upper() == 'FROM':
                from_seen = True

        if not table_name:
            raise ValueError("Could not identify table name in query")

        # Extract WHERE conditions if present
        where_clause = None
        for token in parsed.tokens:
            if isinstance(token, Where):
                where_clause = token
                break

        conditions = self._parse_where_clause(where_clause) if where_clause else {}

        # Identify relevant shards
        relevant_shards = self._identify_relevant_shards(table_name, conditions)

        # Execute query across shards and combine results
        results = []
        for shard_index in relevant_shards:
            shard_df = self._read_shard(shard_index)

            # Apply conditions
            if conditions:
                shard_df = self._apply_conditions(shard_df, conditions)

            results.append(shard_df)

        # Combine results
        if results:
            final_result = pd.concat(results, ignore_index=True)

            # Extract selected columns
            select_columns = []
            select_seen = False
            for token in parsed.tokens:
                if select_seen and isinstance(token, Identifier):
                    select_columns.extend([col.strip() for col in str(token).split(',')])
                if str(token).upper() == 'SELECT':
                    select_seen = True
                elif str(token).upper() == 'FROM':
                    break

            if select_columns and select_columns != ['*']:
                final_result = final_result[select_columns]

            return final_result
        else:
            return pd.DataFrame()

    def get_table_schema(self, table_name: str) -> Dict:
        """Get the schema information for a specific table."""
        if table_name not in self.schema:
            raise ValueError(f"Table {table_name} not found in schema")
        return self.schema[table_name]

# Example usage:
if __name__ == "__main__":
    # Initialize the query engine
    engine = DistributedQueryEngine(
        database_path="/content/sharded_database",
        metadata_path="/content/database_metadata.json",
        shard_metadata_path="/content/sharded_database/shard_metadata.json"
    )

    # Example queries
    queries = [
        "SELECT * FROM WDICountry WHERE Country_Code = 'USA'"
    ]

    # Execute example queries
    for query in queries:
        try:
            print(f"\nExecuting query: {query}")
            result = engine.execute_query(query)
            print(f"Results shape: {result.shape}")
            print(result.head())
        except Exception as e:
            print(f"Error executing query: {str(e)}")



Executing query: SELECT * FROM WDICountry WHERE Country_Code = 'USA'
Country Code
Results shape: (1, 31)
  Country Code     Short Name     Table Name                 Long Name  \
0          USA  United States  United States  United States of America   

  2-alpha code Currency Unit Special Notes         Region Income Group  \
0           US   U.S. dollar           NaN  North America  High income   

  WB-2 code  ...    Government Accounting concept  \
0        US  ...  Consolidated central government   

                     IMF data dissemination standard Latest population census  \
0  Special Data Dissemination Standard Plus (SDDS...          2020 (expected)   

  Latest household survey Source of most recent Income and expenditure data  \
0                     NaN                    Labor force survey (LFS), 2016   

  Vital registration complete  Latest agricultural census  \
0                         Yes                        2012   

   Latest industrial data Latest trade data 

In [None]:
import os
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify
from werkzeug.utils import secure_filename
import shutil
import json
import pandas as pd
from pathlib import Path

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads/'
app.config['ALLOWED_EXTENSIONS'] = {'zip', 'tar', 'tar.gz'}
app.secret_key = 'your_secret_key'

# Initialize the query engine (empty for now)
engine = None


def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1] in app.config['ALLOWED_EXTENSIONS']


def extract_folder(zip_file, folder_name):
    """Helper function to extract the contents of the zip file to a folder."""
    if not os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], folder_name)):
        os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], folder_name))
    shutil.unpack_archive(zip_file, os.path.join(app.config['UPLOAD_FOLDER'], folder_name))


@app.route('/')
def index():
    return render_template('upload.html')


@app.route('/upload', methods=['POST'])
def upload_database():
    if 'file' not in request.files:
        flash('No file part')
        return redirect(request.url)

    file = request.files['file']

    if file.filename == '':
        flash('No selected file')
        return redirect(request.url)

    if file and allowed_file(file.filename):
        company_name = request.form['company_name']
        filename = secure_filename(file.filename)

        # Save the uploaded zip file
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        file.save(file_path)

        # Extract contents of the zip to the folder with company name
        extract_folder(file_path, company_name)


        flash(f"Database for {company_name} uploaded successfully!")
        return redirect(url_for('execute_query'))

    flash('Invalid file format. Only zip files are allowed.')
    return redirect(request.url)


@app.route('/execute_query', methods=['GET', 'POST'])
def execute_query():
    global engine

    if engine is None:
        return redirect(url_for('index'))

    if request.method == 'POST':
        query = request.form['query']

        try:
            result = engine.execute_query(query)
            result_html = result.to_html(classes="table table-bordered table-striped", index=False)
            return render_template('execute_query.html', result_html=result_html, query=query)
        except Exception as e:
            flash(f"Error executing query: {e}")
            return redirect(request.url)

    return render_template('execute_query.html')


if __name__ == '__main__':
    app.run(debug=True)


 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug: * Restarting with stat
