In [None]:
import sys
sys.path.append('../')

In [None]:
import pandas as pd 
from pathlib import Path
import re
import pyspark.sql.types as T
from math import ceil
import warnings
from dataclasses import dataclass
from typing import Type, Any, Union
from collections import defaultdict
import pickle
from common import get_datestr
from tqdm import tqdm

In [None]:
%run Shared/Init

In [None]:
def load_hubspot_metadata(sheet_name):
    # put the whole excel into dataframe metadata because why not ¯\_(ツ)_/¯

    local_path = Path('')
    REFERENCE_PATH = APPLICATION_PATH / 'hubspot' / ''

    REFERENCE_PATH.cp('file:' + str(local_path))

    objects_properties_reference = pd.read_excel(local_path, sheet_name=sheet_name)

    metadata = defaultdict(lambda: {})

    for index, row in objects_properties_reference.iterrows():
        keys = row.index.values.tolist()

        keys.remove('Internal name')

        metadata[row['Internal name']] = {
            key.replace(' ', '_').lower(): str(row[key]) for key in keys
        }

    return metadata

def add_custom_metadata(metadata, property_type_counts, property_schemas):
    # add more metadata that might help understand/debug
    keys = list(set(property_type_counts.keys()).union(set(property_schemas.keys())))

    for key in keys:
        assert 'type_counts' not in metadata[key]
        assert 'property_schema' not in metadata[key]

        if key in property_type_counts:
            metadata[key]['type_counts'] = str(dict(property_type_counts[key])) # defaultdict -> dict conversion, otherwise the string is ugly as hell
        
        if key in property_schemas:
            metadata[key]['property_schema'] = str(property_schemas[key])

In [None]:
def load_json_queue(object_name):
    path = RAW_PATH / 'hubspot' / object_name

    json_file_queue = []

    for folder in path.iterdir():
        # print(folder)
        for file in folder.reiterdir(r'.*\.json'):
            record = {
                'path': file,
            }

            if re.match(r'^\d{8}$', folder.name):
                record['load_datetime'] = datetime.strptime(folder.name, '%Y%m%d')
            elif re.match(r'^\d{8}_\d{6}$', folder.name):
                record['load_datetime'] = datetime.strptime(folder.name, '%Y%m%d_%H%M%S')
            else:
                raise ValueError(f'unparsable folder name: {folder.name}')

            json_file_queue.append(record)
            #print(record)

    return json_file_queue



In [None]:
def collect_properties(data):
    # helper function that just loads all data from a json and collects it by column instead of by row
    properties = defaultdict(lambda: [])

    def add_property(properties, key, value):
        if isinstance(value, dict): # nested property
            try:
                properties[key].append(value['value'])
            except:
                print(value)
                raise
        else: # flat property
            properties[key].append(value)

    for record in data:
        for key, value in record.items():
            if key != 'properties':
                # mark all top-level properties as meta-properties to clearly distinguish them from the 'properties' list
                key = f'meta-{key}'

                add_property(properties, key, value)
        
        for key, value in record['properties'].items():
            add_property(properties, key, value)

    return properties

is_null = re.compile(r'^$')
is_timestamp = re.compile(r'^\d{13}$')


def count_types_in_single_file(key, values, is_meta):
    # count how many different types are encountered for every column
    # uses several heuristics to guess/infer what data type it might be
    # each data type is described by a string, to make further processing easier

    python_types = {type(value) for value in values}

    python_types_text = {str(t.__name__) for t in python_types}

    if len(python_types) > 1 and 'NoneType' not in python_types_text:
        raise ValueError(f'cannot parse {key} because it contains multiple python data types {python_types_text=}')
    
    type_counts = defaultdict(lambda: 0)

    if is_meta:
        if len(python_types) > 1:
            assert False

        dtype = list(python_types)[0]
        
        if dtype == list:
            item_types = {type(elem) for item in values for elem in item}

            if len(item_types) == 0:
                type_counts['list<unknown>'] = len(values)
            elif len(item_types) == 1:
                item_dtype = list(item_types)[0]
                type_counts[f'list<{item_dtype.__name__}>'] = len(values)
            else:
                raise ValueError(f'cannot parse {key} because it contains multiple python data types {python_types_text=}')
        else:
            type_counts[dtype.__name__] = len(values)

    else:
        if len(python_types_text - {'str', 'NoneType'}) > 0:
            raise TypeError(f'cannot parse property {key} because it contains non-string values. {python_types_text=}')

        for value in values:
            if value is None or is_null.match(value):
                type_counts['null'] += 1
            elif is_timestamp.match(value):
                type_counts['timestamp'] += 1
            else:
                try:
                    val = int(value)

                    if val > 9.0e+18: # it's int but it's out of bounds for java long
                        type_counts['str'] += 1
                    else:
                        type_counts['int'] += 1
                except:
                    try:
                        float(value)
                        type_counts['float'] += 1
                    except:
                        if value in ['true', 'false']:
                            type_counts['bool'] += 1
                        else:
                            type_counts['str'] += 1

    return type_counts

def count_types(queue):
    # iterate over all files and count types for every column encountered
    property_type_counts = defaultdict(lambda: [])

    for queue_data in tqdm(queue):
        data = json.loads(queue_data['path'].read_text())
        properties = collect_properties(data)

        for name, values in properties.items():
            is_meta = name.startswith('meta-')
            type_counts = count_types_in_single_file(name, values, is_meta)
            property_type_counts[name].append(type_counts)

    # merge type counts from different files into one
    final_property_type_counts = defaultdict(lambda: defaultdict(lambda: 0))
    for key in property_type_counts:
        for type_counts in property_type_counts[key]:
            for key2, value in type_counts.items():
                final_property_type_counts[key][key2] += value

    return final_property_type_counts

In [None]:
# helper class to hold all information about a single column to be parsed from hubspot
@dataclass
class PropertySchema:
    dtype: str # int, float, str, bool, list, datetime, date
    is_meta: bool # meta-properties are top-level properties, regular properties occur inside 'properties' key
    ignore_property: bool = False # allows filtering out unparsable columns without crashing the pipeline by raising an Error

def infer_data_type(name, type_counts, is_meta, raise_unknown_types=True):
    # logic that takes type counts and infers what actual data type it should be parsed as
    if is_meta:
        # this is incredibly messy, what it does is simply: allow only int, float, bool, str, list<int>, list<float>, list<bool>, list<str>
        # this will be refactored later
        nonzero_keys = []
        for key, count in type_counts.items():
            if count > 0:
                nonzero_keys.append(key)

        if len(nonzero_keys) > 2:
            raise ValueError(f'cannot infer data type of meta property {name} with type counts: {type_counts}')
        elif len(nonzero_keys) == 2:
            if 'list<unknown>' not in nonzero_keys:
                raise ValueError(f'cannot infer data type of meta property {name} with type counts: {type_counts}')

            nonzero_keys.remove('list<unknown>')
            if not nonzero_keys[0].startswith('list<'):
                raise ValueError(f'cannot infer data type of meta property {name} with type counts: {type_counts}')

            if nonzero_keys[0] not in ['list<int>', 'list<str>', 'list<float>', 'list<bool>']:
                if raise_unknown_types:
                    raise ValueError(f'cannot infer data type of meta property {name} with type counts: {type_counts}')
                else:
                    return PropertySchema('unknown', is_meta=True, ignore_property=True)

            return PropertySchema(nonzero_keys[0], is_meta=True)
        elif len(nonzero_keys) == 1:
            if 'list<unknown>' in nonzero_keys:
                if raise_unknown_types:
                    raise ValueError(f'cannot infer data type of meta property {name} with type counts: {type_counts}')
                else:
                    return PropertySchema('unknown', is_meta=True, ignore_property=True)

            if nonzero_keys[0].startswith('list') and nonzero_keys[0] not in ['list<int>', 'list<str>', 'list<float>', 'list<bool>']:
                if raise_unknown_types:
                    raise ValueError(f'cannot infer data type of meta property {name} with type counts: {type_counts}')
                else:
                    return PropertySchema('unknown', is_meta=True, ignore_property=True)

            return PropertySchema(nonzero_keys[0], is_meta=True)
        else:
            raise ValueError(f'invalid state, find a bug above')
    else:

        def has(key):
            return type_counts[key] > 0

        # def has_re(key_regexp):
            # return any([type_counts[key] > 0 for key in type_counts if re.match(key_regexp, key)])

        def has_any_other_than(key):
            return any(type_counts[key2] > 0 for key2 in type_counts if key2 != key)

        if has('str'):
            return PropertySchema('str', is_meta=False)

        if has('bool'):
            if has_any_other_than('bool'): # inconsistent entries, cannot be cleanly parsed
                return PropertySchema('str', is_meta=False)

            return PropertySchema('bool', is_meta=False)

        if has('float'):
            # timestamp, int & null entries are fine and parsable
            return PropertySchema('float', is_meta=False)
        
        if has('int'):
            # timestamp & null entries are fine and parsable
            return PropertySchema('int', is_meta=False)

        if has('timestamp'):
            # null entries are fine and parsable
            return PropertySchema('timestamp', is_meta=False)

        if has('null'):
            if raise_unknown_types:
                raise ValueError(f'Unable to infer valid type for {name} from {type_counts}')
            else:
                return PropertySchema('unknown', is_meta=True, ignore_property=True)

        raise ValueError(f'Unable to infer valid type for {name} from {type_counts}')


def infer_property_schemas(property_type_counts):

    property_schemas = {}

    for name, type_counts in property_type_counts.items():
        is_meta = name.startswith('meta-')

        property_schema = infer_data_type(name, type_counts, is_meta, raise_unknown_types=False)

        property_schemas[name] = property_schema
            
    return property_schemas


def remove_ignored_property_schemas(property_schemas):

    keys = list(property_schemas.keys())

    for key in keys:
        if property_schemas[key].ignore_property:
            print(f'{key}: IGNORING PROPERTY AND DELETING IT FROM SCHEMA')
            del property_schemas[key]

    print(f'{len(property_schemas)} properties left')


def save_property_schemas(property_schemas, path):
    path.write_bytes(pickle.dumps(property_schemas))

In [None]:
def parse_dtype(dtype_str):
    if re.match('^list<.*>$', dtype_str):
        return T.ArrayType(parse_dtype(re.match('^list<(.*)>$', dtype_str)[1]))
    elif dtype_str == 'int':
        return T.LongType()
    elif dtype_str == 'float':
        return T.DoubleType()
    elif dtype_str == 'str':
        return T.StringType()
    elif dtype_str == 'bool':
        return T.BooleanType()
    elif dtype_str == 'timestamp':
        return T.TimestampType()
    else:
        raise NotImplementedError(dtype_str)

def build_pyspark_schema(property_schemas, metadata):
    _metadata = defaultdict(lambda: {})
    _metadata.update(metadata)

    schema_suffix = [T.StructField('__load_timestamp', T.TimestampType(), False)]

    fields = []
    for name, schema in property_schemas.items():
        if schema.ignore_property:
            continue

        fields.append(
            T.StructField(name, parse_dtype(schema.dtype), True, _metadata[name])
        )

    return T.StructType(fields + schema_suffix)

In [None]:
def parse_meta_property(data, dtype):
    if dtype in ['int', 'float', 'str', 'bool']:
        return data
    elif dtype.startswith('list<'):
        return data
    elif dtype == 'timestamp':
        return datetime.fromtimestamp(data / 1000)
    else:
        raise TypeError(f'unknown schema {dtype=} for meta property')


def parse_property(data, dtype):
    if isinstance(data, dict):
        data = data['value']
    if data is None or data == '':
        return None
    elif dtype == 'str':
        return data
    elif dtype == 'timestamp':
        return datetime.fromtimestamp(int(data) / 1000)
    elif dtype == 'int':
        return int(data)
    elif dtype == 'float':
        return float(data)
    elif dtype == 'bool':
        return data == 'true'
    else:
        raise TypeError(f'unknown {dtype=} for property')


def parse_json(data, property_schemas, out_rows, row_suffix):
    for record in data:
        row = []
        for key, schema in property_schemas.items():
            if schema.is_meta:
                if key in record:
                    row.append(parse_meta_property(record[key], schema.dtype))
                else:
                    row.append(None)
            else:
                if key in record['properties']:
                    row.append(parse_property(record['properties'][key], schema.dtype))
                else:
                    row.append(None)
        
        out_rows.append(row + row_suffix)


def parse_jsons(queue, property_schemas, pyspark_schema, intermediate_path):
    first_chunk = True

    out_data = []

    for queue_data in tqdm(queue):
        data = json.loads(queue_data['path'].read_text())

        row_suffix = [queue_data['load_datetime']] # appended to every row
        parse_json(data, property_schemas, out_data, row_suffix)

        if len(out_data) > 20000:
            if first_chunk:
                mode = 'overwrite'
            else:
                mode = 'append'

            first_chunk = False

            df = spark.createDataFrame(out_data, schema=pyspark_schema).repartition(1)
            df.write.mode(mode).parquet(str(intermediate_path))
            # print(f'wrote {len(out_data)} rows with {mode=} to {str(intermediate_path)}...')

            out_data = []

    if first_chunk:
        mode = 'overwrite'
    else:
        mode = 'append'

    first_chunk = False

    df = spark.createDataFrame(out_data, schema=pyspark_schema).repartition(1)
    # print(f'wrote {len(out_data)} rows with {mode=} to {str(intermediate_path)}...')
    df.write.mode(mode).parquet(str(intermediate_path))

In [None]:
def deduplicate_and_finalize(intermediate_path, out_path):
    df = spark.read.parquet(str(intermediate_path)).cache()

    load_meta_columns = ['__load_timestamp']

    deduplication_cols = [col for col in df.columns if col not in load_meta_columns]
    deduplication_sort_cols = ['__load_timestamp']

    deduplication_group_window = Window.partitionBy(deduplication_cols).orderBy(deduplication_sort_cols).rowsBetween(Window.unboundedPreceding, Window.currentRow)

    num_before_deduplication = df.count()

    deduplicated_df = (
        df
        .withColumn('duplicate_index', F.row_number().over(deduplication_group_window))
        .where('duplicate_index == 1')
        .drop('duplicate_index')
        .cache()
    )

    num_after_deduplication = deduplicated_df.count()

    fields_per_partition = 150 * 50000.0 # cols * rows

    num_partitions = int(ceil(len(deduplicated_df.columns) * 1.0 * num_after_deduplication / fields_per_partition))

    deduplicated_df = deduplicated_df.repartition(num_partitions)
    deduplicated_df = deduplicated_df.write.mode('overwrite').parquet(str(out_path))

    print(f'deduplicated from {num_before_deduplication} to {num_after_deduplication}, into {num_partitions=}')

In [None]:
def cleanup(intermediate_path):
    intermediate_path.rm(recurse=True)

In [None]:
text_classification_using_various_modelsdef standardize(object_name, metadata_sheet, datestr):

    intermediate_path = STANDARDIZED_PATH / 'hubspot' / f'{object_name}' / datestr / '_temp_intermediate_data'
    output_path = STANDARDIZED_PATH / 'hubspot' / f'{object_name}' / datestr / 'data'
    property_schemas_path = STANDARDIZED_PATH / 'hubspot' / f'{object_name}' / datestr / 'property_schemas.pkl'
    
    print(f'\n\n\n---{object_name}: ⏳ STARTING ⏳ ---')
    print(f'---{object_name}: LOADING METADATA---')
    metadata = load_hubspot_metadata(metadata_sheet)
    print(f'---{object_name}: LOADING QUEUE---')
    queue = load_json_queue(object_name)
    print(f'---{object_name}: COUNTING PROPERTY TYPES---')
    property_type_counts = count_types(queue)
    print(f'---{object_name}: INFERRING PROPERTY SCHEMA---')
    property_schemas = infer_property_schemas(property_type_counts)
    print(f'---{object_name}: INCLUDING ADDITIONAL METADATA---')
    add_custom_metadata(metadata, property_type_counts, property_schemas)
    print(f'---{object_name}: SAVING PROPERTY SCHEMA---')
    save_property_schemas(property_schemas, property_schemas_path)
    print(f'---{object_name}: REMOVING IGNORED PROPERTIES---')
    remove_ignored_property_schemas(property_schemas)
    print(f'---{object_name}: BUILDING PYSPARK SCHEMA---')
    pyspark_schema = build_pyspark_schema(property_schemas, metadata)
    print(f'---{object_name}: PARSING JSONS INTO INTERMEDIATE PARQUET---')
    parse_jsons(queue, property_schemas, pyspark_schema, intermediate_path)
    print(f'---{object_name}: DEDUPLICATION INTO FINAL PARQUET---')
    deduplicate_and_finalize(intermediate_path, output_path)
    print(f'---{object_name}: CLEANUP INTERMEDIATE PARQUET---')
    cleanup(intermediate_path)
    print(f'---{object_name}: ✅ SUCCESFULLY FINISHED ✅ ---\n\n\n')

In [None]:
datestr = get_datestr()#datetime.now().strftime('%Y%m%d')

config = [
    ('contacts', 'CONTACT'),
    ('companies', 'COMPANY'),
    ('input_material_streams', 'input_material_streams'),
    ('output_material_streams', 'output_material_streams'),
]

for object_name, metadata_sheet in config:
    standardize(object_name, metadata_sheet, datestr)

In [None]:
# object_name = 'input_material_streams'
# metadata_sheet = 'input_material_streams'
# datestr = datetime.now().strftime('%Y%m%d')

# # standardized -> standardized_testing, just to be safe
# intermediate_path = 
# output_path = 
# property_schemas_path = 

# print(f'---{object_name}: ⏳ STARTING ⏳ ---')
# print(f'---{object_name}: LOADING METADATA---')
# metadata = load_hubspot_metadata(metadata_sheet)
# print(f'---{object_name}: LOADING QUEUE---')
# queue = load_json_queue(object_name)[-20:]
# print(f'---{object_name}: COUNTING PROPERTY TYPES---')
# property_type_counts = count_types(queue)
# print(f'---{object_name}: INFERRING PROPERTY SCHEMA---')
# property_schemas = infer_property_schemas(property_type_counts)
# print(f'---{object_name}: INCLUDING ADDITIONAL METADATA---')
# add_custom_metadata(metadata, property_type_counts, property_schemas)
# print(f'---{object_name}: SAVING PROPERTY SCHEMA---')
# save_property_schemas(property_schemas, property_schemas_path)
# print(f'---{object_name}: REMOVING IGNORED PROPERTIES---')
# remove_ignored_property_schemas(property_schemas)
# print(f'---{object_name}: BUILDING PYSPARK SCHEMA---')
# pyspark_schema = build_pyspark_schema(property_schemas, metadata)
# print(f'---{object_name}: PARSING JSONS INTO INTERMEDIATE PARQUET---')
# parse_jsons(queue, property_schemas, pyspark_schema, intermediate_path)
# print(f'---{object_name}: DEDUPLICATION INTO FINAL PARQUET---')
# deduplicate_and_finalize(intermediate_path, output_path)
# print(f'---{object_name}: CLEANUP INTERMEDIATE PARQUET---')
# cleanup(intermediate_path)
# print(f'---{object_name}: ✅ SUCCESFULLY FINISHED ✅ ---')

In [None]:
# df = spark.read.parquet(str(output_path)).cache()
# display(df)

In [None]:
# props = collect_properties(json.loads(queue[0]['path'].read_text()))

In [None]:
# data = json.loads(queue[0]['path'].read_text())

In [None]:
from copy import copy

def inspect_queue(queue, print_first_n=1, reversed_order=False):
    """
    Helper function for seeing formatted json contents of the queue. It will print first n json records. It will load
    as many jsons from the queue as necessary to print this number of records.

    queue: list of records with path to jsons
    print_first_n: 
    """
    print(f'---Inspecting queue with {len(queue)} queue records---\n')

    cnt = print_first_n

    pop_ix = -1 if reversed_order else 0

    queue = copy(queue)
    data = []
    while cnt > 0 and (len(data) > 0 or len(queue) > 0):
        # ensures we load the next non-empty json in the queue when we run out of records
        while len(data) == 0 and len(queue) > 0:
            queue_record = queue.pop(pop_ix)
            data = json.loads(queue_record['path'].read_text())

        # in the absolute worst case scenario when we run out of records altogether we exit the function
        if len(queue) == 0 and len(data) == 0:
            print(f'Couldn\'t find sufficient records, ending inspect.')
            return

        record = data.pop(pop_ix)

        print(f'Record from {queue_record["path"]}:\n')
        print(json.dumps(record, indent=4))
        print(f'\n\n')

        cnt -= 1

In [None]:
# inspect_queue(queue, 5, True)

In [None]:
from pyspark.sql import Row

def display_schema(df, hide_options=True):
    schema_info = [Row(column_name=field.name, data_type=str(field.dataType), nullable=field.nullable, metadata=field.metadata) for field in df.schema.fields]

    if hide_options:
        for row in schema_info:
            if 'options' in row['metadata']:
                del row['metadata']['options']
            # print(type(row['metadata']), row['metadata'])

    schema_df = spark.createDataFrame(schema_info)
    display(schema_df)

In [None]:
# display_schema(df)

In [None]:
# def metadata(df, col):
#     print(f'metadata for df["{col}"]:')
#     print(json.dumps(df.schema[col].metadata, indent=4))

# metadata(df, 'name')