In [1]:
import sys
import psycopg2 as ps

!{sys.executable} -m pip install postgres_csv_uploader
from postgres_csv_uploader.uploader import PostgresCSVUploader

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.9 -m pip install --upgrade pip[0m


In [2]:
!{sys.executable} -m pip install ipdb

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.9 -m pip install --upgrade pip[0m


In [3]:
import re
from io import StringIO
from typing import Dict, List, Optional, Tuple

import pandas as pd
import psycopg2 as ps
import psycopg2.sql as sql
from psycopg2.extensions import connection


class PostgresCSVUploader:
    def __init__(self, conn: connection):
        self.conn = conn
        self.data = None
        self.buffer = StringIO()
        self.py_2_sql_map = {
            "uint8": "SMALLINT",
            "uint16": "SMALLINT",
            "uint32": "INTEGER",
            "uint64": "BIGINT",
            "int": "INTEGER",
            "int8": "SMALLINT",
            "int16": "INTEGER",
            "int32": "INTEGER",
            "int64": "BIGINT",
            "complex128": "VARCHAR",
            "complex64": "VARCHAR",
            "str": "VARCHAR",
            "object": "VARCHAR",
            "category": "VARCHAR",
            "Decimal": "NUMERIC",
            "float": "FLOAT",
            "float16": "FLOAT",
            "float32": "FLOAT",
            "float64": "DOUBLE PRECISION",
            "datetime": "TIMESTAMP",
            "time": "TIME",
            "date": "DATE",
            "datetime64": "DATE",
            "bytes": "BYTEA",
            "void": "BYTEA",
            "bool": "BOOLEAN",
            "timedelta": "INTERVAL",
            "timedelta64": "INTERVAL",
            "list": "ARRAY",
            "dict": "JSON",
        }

    @classmethod
    def from_new_connection(
        cls, host: str, user: str, password: str, port: str, database: Optional[str]
    ):
        conn = ps.connect(
            host=host,
            database=database if database else user,
            user=user,
            password=password,
            port=port,
        )
        return cls(conn)

    def create_table(
        self,
        filepath: str,
        table: str,
        index_col: Optional[str] = None,
        datetime_cols: Optional[List[str]] = None,
    ) -> None:
        cur = self.conn.cursor()
        columns = self.create_table_schema(filepath, index_col, datetime_cols)
        sanitized_cols = [
            sql.Identifier(c[0]).as_string(cur) + f" {c[1]}" for c in columns
        ]
        sanitized_cols[0] += " PRIMARY KEY"
        delete_query = sql.SQL("DROP TABLE IF EXISTS {0};").format(
            sql.Identifier(table)
        )
        query = sql.SQL("CREATE TABLE {0} ({1});").format(
            sql.Identifier(table), sql.SQL(",".join(sanitized_cols))
        )
        cur.execute(delete_query)
        cur.execute(query)
        return cur.query.decode("utf-8")

    def upload(
        self,
        filepath: str,
        table: str,
        index_col: Optional[str] = None,
        datetime_cols: Optional[List[str]] = None,
    ):
        """Uploads a CSV file as its own table in a Postgres DB

        Args:
            filepath (str): Path to CSV file
            table (str): Table name
            index_col (Optional[str], optional): Name of index column. Defaults to
            None, in which case a numerical index is used.
            datetime_cols (Optional[List[str]], optional): List of column names for
            "datetime" columns. Defaults to None.
        """
        cur = self.conn.cursor()
        self.create_table(filepath, table, index_col, datetime_cols)
        self.buffer.seek(0)
        # `copy_expert` lets us specify CSV formatting, which is important when columns
        # contain commas, or for keeping null values.
        cur.copy_expert(f"COPY {table} FROM STDIN WITH (FORMAT CSV)", self.buffer)
        self.conn.commit()

    def create_table_schema(
        self,
        filepath: str,
        index_col: Optional[str] = None,
        datetime_cols: Optional[List[str]] = None,
    ) -> Dict[str, str]:
        """Generates a Postgres schema based on the Pandas dtypes of an input CSV file

        Args:
            filepath (str): Path to CSV file
            index_col (Optional[str], optional): Name of index column. Defaults to
            None, in which case a numerical index is used.
            datetime_cols (Optional[List[str]], optional): List of column names for
            "datetime" columns. Defaults to None.

        Returns:
            Dict[str, str]: Mapping from column name to postgres type name
        """
        df = pd.read_csv(filepath, parse_dates=datetime_cols)
        if "Unnamed: 0" in df.columns:
            df.drop("Unnamed: 0", inplace=True, axis=1)
        if not index_col:
            df = df.reset_index()
            index_col = df.columns[0]
        else:
            df.insert(0, index_col, df.pop(index_col), allow_duplicates=True)

        df = df.infer_objects()
        df.to_csv(self.buffer, header=False, index=False)
        cols_map = self.map_sql_dtypes(df)
        return cols_map

    def map_sql_dtypes(self, df: pd.DataFrame) -> List[Tuple[str, str]]:
        """Given a dataframe, map each of its columns to the appropriate Postgres type

        Args:
            df (pd.DataFrame): Input dataframe

        Returns:
            Dict[str, str]: Mapping from column name to postgres type name
        """
        col_to_pgtype = []
        for col in df.columns:
            dtype = df[col].dtype.name

            if "time" in dtype and re.compile(r"\[.*\]$").search(dtype):
                dtype = re.sub(r"\[.*\]$", "", dtype)
            col_to_pgtype.append((col, self.py_2_sql_map[dtype]))

        return col_to_pgtype


In [137]:
from inspect import signature
from typing import Dict, List, Optional, Tuple
from io import StringIO
import numpy as np
import psycopg2 as ps
from psycopg2.extensions import connection
import psycopg2.sql as sql
import pandas as pd
from sqlalchemy.dialects import postgresql
from collections import defaultdict
import warnings
import re
import csv

class PostgresCSVUploader:

    def __init__(self, conn: connection):
        self.conn = conn
        self.data = None
        self.buffer = StringIO()
        self.py_2_sql_map = {
            'uint8': 'SMALLINT', 
            'uint16': 'SMALLINT', 
            'uint32': 'INTEGER', 
            'uint64': 'BIGINT', 
            'int': 'INTEGER', 
            'int8': 'SMALLINT', 
            'int16': 'INTEGER', 
            'int32': 'INTEGER', 
            'int64': 'BIGINT', 
            'complex128': 'VARCHAR', 
            'complex64': 'VARCHAR', 
            'str': 'VARCHAR', 
            'object': 'VARCHAR', 
            'category': 'VARCHAR', 
            'Decimal': 'NUMERIC', 
            'float': 'FLOAT', 
            'float16': 'FLOAT', 
            'float32': 'FLOAT', 
            'float64': 'DOUBLE PRECISION', 
            'datetime': 'TIMESTAMP', 
            'time': 'TIME', 
            'date': 'DATE', 
            'datetime64': 'DATE', 
            'bytes': 'BYTEA', 
            'void': 'BYTEA', 
            'bool': 'BOOLEAN', 
            'timedelta': 'INTERVAL', 
            'timedelta64': 'INTERVAL', 
            'list': 'ARRAY', 
            'dict': 'JSON',
            }

    
    @classmethod
    def from_new_connection(cls,
        host: str,
        user: str,
        password: str,
        port: str,
        database: Optional[str]
    ):
        conn = ps.connect(
            host=host,
            database=database if database else user,
            user=user,
            password=password,
            port=port
        )
        return cls(conn)
    
    def create_table(
        self, 
        filepath: str, 
        table: str,
        index_col: Optional[str] = None,
        datetime_cols: Optional[List[str]] = None
    ) -> None:
        cur = self.conn.cursor()
        columns = self.create_table_schema(filepath, index_col, datetime_cols)
        sanitized_cols = [sql.Identifier(c[0]).as_string(cur) + f" {c[1]}" for c in columns]
        sanitized_cols[0] += " PRIMARY KEY"
        delete_query = sql.SQL('DROP TABLE IF EXISTS {0};').format(sql.Identifier(table))
        query = sql.SQL(
            "CREATE TABLE {0} ({1});"
            ).format(
                sql.Identifier(table),
                sql.SQL(",".join(sanitized_cols))
            )
        print(query)
        cur.execute(delete_query)
        cur.execute(query)
        return cur.query.decode("utf-8")
    
    def upload(
        self,
        filepath: str,
        table: str,
        index_col: Optional[str] = None,
        datetime_cols: Optional[List[str]] = None,
    ):
        """Uploads a CSV file as its own table in a Postgres DB
        Args:
            filepath (str): Path to CSV file
            table (str): Table name
            index_col (Optional[str], optional): Name of index column. Defaults to None, in which case a numerical index is used.
            datetime_cols (Optional[List[str]], optional): List of column names for "datetime" columns. Defaults to None.
        """
        cur = self.conn.cursor()
        self.create_table(filepath, table, index_col, datetime_cols)
        self.buffer.seek(0)
#         cur.copy_from(self.buffer, table, sep=',')
        cur.copy_expert(f"COPY {table} FROM STDIN WITH (FORMAT CSV)", self.buffer)
        self.conn.commit()



    def create_table_schema(
        self,
        filepath: str,
        index_col: Optional[str] = None,
        datetime_cols: Optional[List[str]] = None
    ) -> Dict[str, str]:
        """Generates a Postgres schema based on the Pandas dtypes of an input CSV file
        Args:
            filepath (str): Path to CSV file
            index_col (Optional[str], optional): Name of index column. Defaults to None, in which case a numerical index is used.
            datetime_cols (Optional[List[str]], optional): List of column names for "datetime" columns. Defaults to None.
        Returns:
            Dict[str, str]: Mapping from column name to postgres type name
        """
        df = pd.read_csv(filepath, parse_dates=datetime_cols)
#         df.drop("description", axis=1, inplace=True)
#         df.drop("part_specs_mount_display_value", axis=1, inplace=True)
        if "Unnamed: 0" in df.columns: df.drop("Unnamed: 0", inplace=True, axis=1)
        if not index_col:
            df = df.reset_index()
            index_col = df.columns[0]
        else:
            df.insert(0, index_col, df.pop(index_col), allow_duplicates=True)
        
        df = df.infer_objects()
#         df.fillna(df.dtypes.replace({'float64': 0.0, 'O': 'NULL'}), inplace=True)
        df.to_csv(self.buffer, header=False, index=False) #, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
        cols_map = self.map_sql_dtypes(df)
        return cols_map


    def map_sql_dtypes(self, df: pd.DataFrame) -> List[Tuple[str, str]]:
        """Given a dataframe, map each of its columns to the appropriate Postgres type 
        Args:
            df (pd.DataFrame): Input dataframe
        Returns:
            Dict[str, str]: Mapping from column name to postgres type name
        """
        col_to_pgtype = []
        for col in df.columns:
            dtype = df[col].dtype.name

            if 'time' in dtype and re.compile(r'\[.*\]$').search(dtype):
                dtype = re.sub(r'\[.*\]$', '', dtype)
            col_to_pgtype.append((col, self.py_2_sql_map[dtype]))
        return col_to_pgtype


In [144]:
from timeit import default_timer as timer

start = timer()

host = "ec2-34-233-115-14.compute-1.amazonaws.com"
port = 5432
database = "dfu56m15dkhh46"
user = "pgyrjmstmyerfk"
password = "228fcbba14e9d2bf362fcaa29cabe1106cc8dba00605f45ee25e810194309fd4"

conn = ps.connect(
    host=host,
    user=user,
    password=password,
    port=port,
    database=database
)

uploader = PostgresCSVUploader(conn)
uploader.upload(
    "csv/mica.csv",
    "mica"
)
# ...
end = timer()
print(f"\n\n\nTime elapsed {end - start}s") # Time in seconds, e.g. 5.38091952400282

Composed([SQL('CREATE TABLE '), Identifier('mica'), SQL(' ('), SQL('"index" BIGINT PRIMARY KEY,"_cache_id" VARCHAR,"description" VARCHAR,"part_category_id" BIGINT,"part_id" BIGINT,"part_manufacturer_id" BIGINT,"part_manufacturer_is_verified" BOOLEAN,"part_manufacturer_name" VARCHAR,"part_median_price_1000__cache_id" VARCHAR,"part_median_price_1000_converted_currency" VARCHAR,"part_median_price_1000_converted_price" DOUBLE PRECISION,"part_mpn" VARCHAR,"part_specs_capacitance_id" BIGINT,"part_specs_capacitance_display_value" VARCHAR,"part_specs_case_package_id" DOUBLE PRECISION,"part_specs_case_package_display_value" VARCHAR,"part_specs_depth_id" DOUBLE PRECISION,"part_specs_depth_display_value" VARCHAR,"part_specs_dielectricmaterial_id" DOUBLE PRECISION,"part_specs_dielectricmaterial_display_value" VARCHAR,"part_specs_features_id" DOUBLE PRECISION,"part_specs_features_display_value" VARCHAR,"part_specs_height_id" DOUBLE PRECISION,"part_specs_height_display_value" VARCHAR,"part_specs_hei

In [24]:
print(len(uploader.buffer.readline().strip('\n').split(',')))
print(len(uploader.sanitized_cols))
for i, val in enumerate(uploader.buffer.readline().split(',')): 
#     if val not in (list)(zip(uploader.buffer.readline().split(','), uploader.sanitized_cols)): 
    try:
        print(i, val, uploader.sanitized_cols[i])
    except: 
        print(i, val)

1


AttributeError: 'PostgresCSVUploader' object has no attribute 'sanitized_cols'

In [None]:
import PostgresCSVUploader

host = "ec2-34-233-115-14.compute-1.amazonaws.com"
port = 5432
database = "dfu56m15dkhh46"
user = "pgyrjmstmyerfk"
password = "228fcbba14e9d2bf362fcaa29cabe1106cc8dba00605f45ee25e810194309fd4"

conn = ps.connect(
    host=host,
    user=user,
    password=password,
    port=port,
    database=database
)
uploader = PostgresCSVUploader(conn)
uploader.upload(
    "csv/mica.csv",
    "mica"
)

### Splitting Column into Number and Unit

In [None]:
def split_spec(df: pd.DataFrame) -> df: pd.DataFrame:
    """
    Computes a split 
    """
        
    a = df
    for col in df.columns:
        
    
    # first go through all the columns
    # then somehow detect when we hit a column whose value includes
    

In [80]:
d = pd.read_csv('csv/mica_min.csv')
d.head()

Unnamed: 0,_cache_id,description,part_category_id,part_id,part_manufacturer_id,part_manufacturer_is_verified,part_manufacturer_name,part_median_price_1000__cache_id,part_median_price_1000_converted_currency,part_median_price_1000_converted_price,...,part_specs_weight_display_value,part_specs_workingvoltage_id,part_specs_workingvoltage_display_value,part_specs_ripplecurrent_id,part_specs_ripplecurrent_display_value,part_specs_dielectric_id,part_specs_dielectric_display_value,part_median_price_1000,part_specs_esr_equivalentseriesresistance__id,part_specs_esr_equivalentseriesresistance__display_value
0,US-USD-133323,"Capacitor, Mica, Cap 100pF, Tol 5%, Radial DIP...",6334,133323,87,False,Cornell Dubilier,US-USD-133323-1000-0.991000,USD,0.991,...,,,,,,,,,,
1,US-USD-887085,"Capacitor, Mica, Cap 100pF, Tol 1%, Radial DIP...",6334,887085,87,False,Cornell Dubilier,US-USD-887085-1000-1.530000,USD,1.53,...,,,,,,,,,,
2,US-USD-123206,Capacitor; Mica; Cap 100pF; Tol 5%; Radial Min...,6334,123206,87,False,Cornell Dubilier,US-USD-123206-1000-2.500000,USD,2.5,...,,,,,,,,,,
3,US-USD-153098,Capacitor; Mica; Cap 100pF; Tol 5%; Radial DIP...,6334,153098,87,False,Cornell Dubilier,US-USD-153098-1000-1.908900,USD,1.9089,...,,,,,,,,,,
4,US-USD-136131,Capacitor; Mica; Cap 100 pF; Tol 5%; Vol-Rtg 5...,6334,136131,87,False,Cornell Dubilier,US-USD-136131-1000-1.880000,USD,1.88,...,26.988746 mg,,,,,,,,,


In [136]:
import pint
for row in d['part_specs_weight_display_value']:
    ureg = pint.UnitRegistry()
    quantity = ureg(str(row))
    if type(quantity) is not float:
        print("Value:", quantity.magnitude, "Units: ", quantity.units)

Value: 26.988746 Units:  milligram
Value: 15.989131 Units:  milligram
Value: 15.989131 Units:  milligram


In [None]:
PART_SEARCH_QUERY = """
query PricesViewSearch($country: String!, $currency: String!, $filters: Map, $in_stock_only: \
  Boolean, $limit: Int!, $q: String, $sort: String, $sort_dir: SortDirection, $start: Int) {
  search(country: $country, currency: $currency, filters: $filters, in_stock_only: $in_stock_only, \
    limit: $limit, q: $q, sort: $sort, sort_dir: $sort_dir, start: $start) {
    applied_category {
      ancestors {
        id
        name
        path
        __typename
      }
      id
      name
      path
      __typename
    }
    applied_filters {
      display_values
      name
      shortname
      values
      __typename
    }
    results {
      _cache_id
      description
      part {
        _cache_id
        best_datasheet {
          url
          __typename
        }
        best_image {
          url
          __typename
        }
        category {
          id
          __typename
        }
        counts
        descriptions {
          text
          __typename
        }
        id
        manufacturer {
          id
          is_verified
          name
          __typename
        }
        manufacturer_url
        median_price_1000 {
          _cache_id
          converted_currency
          converted_price
          __typename
        }
        mpn
        specs {
          attribute {
            id
            name
            shortname
            __typename
          }
          display_value
          __typename
        }
        __typename
      }
      __typename
    }
    hits
    __typename
  }
}
"""
