In [115]:
import requests
import pandas 
import json
from flatten_json import flatten
import os
from dotenv import load_dotenv

load_dotenv()

True

In [116]:
TAP_STARTUP_ALBERTA_TOKEN = os.getenv("TAP_STARTUP_ALBERTA_TOKEN")
TAP_STARTUP_ALBERTA_APP_ID = os.getenv("TAP_STARTUP_ALBERTA_APP_ID")

In [117]:
"""
the purpose of this module is to convert JSON schema to BigQuery schema.
"""

from google.cloud.bigquery import SchemaField
import re

METADATA_FIELDS = {
    "_time_extracted": {"type": ["null", "string"], "format": "date-time", "bq_type": "timestamp"},
    "_time_loaded": {"type": ["null", "string"], "format": "date-time", "bq_type": "timestamp"}
}


def cleanup_record(schema, record):
    """
    Clean up / prettify field names, make sure they match BigQuery naming conventions.

    :param JSON record generated by the tap and piped into target-bigquery
    :param bq_schema: JSON schema generated by the tap and piped into target-bigquery
    :return: JSON record/data, where field names are cleaned up / prettified.
    """
    if not isinstance(record, dict) and not isinstance(record, list):
        return record

    elif isinstance(record, list):
        nr = []
        for item in record:
            nr.append(cleanup_record(schema, item))
        return nr

    elif isinstance(record, dict):
        nr = {}
        for key, value in record.items():
            nkey = create_valid_bigquery_field_name(key)
            nr[nkey] = cleanup_record(schema, value)
        return nr

    else:
        raise Exception(f"unhandled instance of record: {record}")


def create_valid_bigquery_field_name(field_name):
    """
    Clean up / prettify field names, make sure they match BigQuery naming conventions.
    
    Fields must:
        • contain only 
            -letters, 
            -numbers, and 
            -underscores, 
        • start with a 
            -letter or 
            -underscore, and
        • be at most 300 characters long

    :param key: JSON field name
    :return: cleaned up JSON field name
    """

    cleaned_up_field_name = ""

    # if char is alphanumeric (either letters or numbers), append char to our string
    for char in field_name:
        if char.isalnum():
            cleaned_up_field_name += char
        else:
            # otherwise, replace it with underscore
            cleaned_up_field_name += "_"

    # if field starts with digit, prepend it with underscore
    if cleaned_up_field_name[0].isdigit():
        cleaned_up_field_name = "_%s" % cleaned_up_field_name

    return cleaned_up_field_name[:300] # trim the string to the first x chars

def prioritize_one_data_type_from_multiple_ones_in_any_of(field_property):
    """
    :param field_property: JSON field property, which has anyOf and multiple data types
    :return: one BigQuery SchemaField field_type, which is prioritized

    Simplification step removes anyOf columns from original JSON schema.

    There's one instance when original JSON schema has no anyOf, but anyOf gets added:

    original JSON schema:

     "simplification_stage_adds_anyOf": {
      "type": [
        "null",
        "integer",
        "string"
      ]
    }

     This is a simplified JSON schema where anyOf got added during
     simplification stage:

      {'simplification_stage_added_anyOf': {
            'anyOf': [
                {
                    'type': [
                        'integer',
                        'null'
                    ]
                },
                {
                    'type': [
                        'string',
                        'null'
                    ]
                }
            ]
        }
        }

    The VALUE of this dictionary will be the INPUT for this function.

    This simplified case needs to be handled.

    Prioritization needs to be applied:
        1) STRING
        2) FLOAT
        3) INTEGER
        4) BOOLEAN

    OUTPUT of the function is one JSON data type with the top priority
    """

    prioritization_dict = {"string": 1,
                           "number": 2,
                           "integer": 3,
                           "boolean": 4,
                           "object": 5,
                           "array": 6,
                           }

    any_of_data_types = {}

    for i in range(0, len(field_property['anyOf'])):
        data_type = field_property['anyOf'][i]['type'][0]

        any_of_data_types.update({data_type: prioritization_dict[data_type]})

    # return key with minimum value, which is the highest priority data type
    # https://stackoverflow.com/questions/268272/getting-key-with-maximum-value-in-dictionary
    return min(any_of_data_types, key=any_of_data_types.get)


def convert_field_type(field_property):
    """
    :param field_property: JSON field property
    :return: BigQuery SchemaField field_type
    """

    conversion_dict = {"string": "STRING",
                       "number": "FLOAT",
                       "integer": "INTEGER",
                       "boolean": "BOOLEAN",
                       "date-time": "TIMESTAMP",
                       "date": "DATE",
                       "time": "TIME",
                       "object": "RECORD",
                       "array": "RECORD",
                       "bq-geography": "GEOGRAPHY",
                       "bq-decimal": "DECIMAL",
                       "bq-bigdecimal": "BIGDECIMAL"
                       }

    if "anyOf" in field_property:

        prioritized_data_type = prioritize_one_data_type_from_multiple_ones_in_any_of(field_property)

        field_type_bigquery = conversion_dict[prioritized_data_type]

    elif field_property["type"][0] == "string" and "format" in field_property:

        field_type_bigquery = conversion_dict[field_property["format"]]

    elif (("items" in field_property) and ("properties" not in field_property["items"])):

        field_type_bigquery = conversion_dict[field_property['items']['type'][0]]

    else:

        field_type_bigquery = conversion_dict[field_property["type"][0]]

    return field_type_bigquery


def determine_field_mode(field_name, field_property):
    """
    :param field_name: one nested JSON field name
    :param field_property: one nested JSON field property
    :return: BigQuery SchemaField mode
    """
    if "items" in field_property:

        field_mode = 'REPEATED'

    else:

        field_mode = 'NULLABLE'

    return field_mode


def replace_nullable_mode_with_required(schema_field_input):
    schema_field_updated = SchemaField(name=schema_field_input.name,
                                       field_type=schema_field_input.field_type,
                                       mode='REQUIRED',
                                       description=schema_field_input.description,
                                       fields=schema_field_input.fields,
                                       policy_tags=schema_field_input.policy_tags)

    return schema_field_updated


def build_field(field_name, field_property):
    """
    :param field_name: one nested JSON field name
    :param field_property: one nested JSON field property
    :return: one BigQuery nested SchemaField
    """

    if not ("items" in field_property and "properties" in field_property["items"]) and not (
            "properties" in field_property):

        return (SchemaField(name=create_valid_bigquery_field_name(field_name),
                            field_type=convert_field_type(field_property),
                            mode=determine_field_mode(field_name, field_property),
                            description=None,
                            fields=(),
                            policy_tags=None)
                )

    elif ("items" in field_property and "properties" in field_property["items"]) or ("properties" in field_property):

        processed_subfields = []

        # https://www.w3schools.com/python/ref_dictionary_get.asp
        for subfield_name, subfield_property in field_property.get("properties",
                                                                   field_property.get("items", {}).get("properties")
                                                                   ).items():
            processed_subfields.append(build_field(subfield_name, subfield_property))

        return (SchemaField(name=create_valid_bigquery_field_name(field_name),
                            field_type=convert_field_type(field_property),
                            mode=determine_field_mode(field_name, field_property),
                            description=None,
                            fields=processed_subfields,
                            policy_tags=None)
                )


def build_schema(schema, key_properties=None, add_metadata=True, force_fields={}):
    """
    :param schema: input simplified JSON schema
    :param key_properties: JSON schema fields which will become required BigQuery column
    :param add_metadata: do we want BigQuery metadata columns (e.g., when data was uploaded?)
    :param force_fields: You can force a field to a desired data type via force_fields flag.
            Use case example:
            tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
            which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
    :return: a list of BigQuery SchemaFields, which represents one BigQuery table
    """

    global required_fields

    required_fields = set(key_properties) if key_properties else set()

    schema_bigquery = []

    for field_name, field_property in schema.get("properties", schema.get("items", {}).get("properties", {})).items():

        if field_name in force_fields:

            next_field = (
                SchemaField(field_name, force_fields[field_name]["type"],
                            force_fields[field_name].get("mode", "nullable"),
                            force_fields[field_name].get("description", None), ())
            )

        else:

            next_field = build_field(field_name, field_property)

            if field_name in required_fields:
                next_field = replace_nullable_mode_with_required(next_field)

        schema_bigquery.append(next_field)

    if add_metadata:

        for field_name in METADATA_FIELDS:
            schema_bigquery.append(SchemaField(name=field_name,
                                               field_type=METADATA_FIELDS[field_name]["bq_type"],
                                               mode='NULLABLE',
                                               description=None,
                                               fields=(),
                                               policy_tags=None)
                                   )

    return schema_bigquery


def format_record_to_schema(record, bq_schema):
    """
    Purpose:
        Singer tap outputs two things: JSON schema and JSON record/data.
        Sometimes tap outputs data, where type doesn't match schema produced by the tap.
        This function makes sure that the data matches the schema.

    RECORD is not included into conversion_dict - it is done on purpose. RECORD is handled recursively.

    :param JSON record generated by the tap and piped into target-bigquery
    :param bq_schema: JSON schema generated by the tap and piped into target-bigquery
    :return: JSON record/data, where the data types match JSON schema
    """

    conversion_dict = {"BYTES": bytes,
                       "STRING": str,
                       "TIME": str,
                       "TIMESTAMP": str,
                       "DATE": str,
                       "DATETIME": str,
                       "FLOAT": float,
                       "NUMERIC": float,
                       "BIGNUMERIC": float,
                       "INTEGER": int,
                       "BOOLEAN": bool,
                       "GEOGRAPHY": str,
                       "DECIMAL": str,
                       "BIGDECIMAL": str
                       }

    if isinstance(record, list):
        new_record = []
        for r in record:
            if isinstance(r, dict):
                r = format_record_to_schema(r, bq_schema)
                new_record.append(r)
            else:
                raise Exception(f"unhandled instance of list object in record: {r}")
        return new_record
    elif isinstance(record, dict):
        rc = record.copy()
        for k, v in rc.items():
            if k not in bq_schema:
                record.pop(k)
            elif v is None:
                pass
            elif bq_schema[k].get("fields"):
                # mode: REPEATED, type: NULLABLE || mode: REPEATED: type: REPEATED
                record[k] = format_record_to_schema(record[k], bq_schema[k]["fields"])
            elif bq_schema[k].get("mode") == "REPEATED":
                # mode: REPEATED, type: [any]
                record[k] = [conversion_dict[bq_schema[k]["type"]](vi) for vi in v]
            else:
                record[k] = conversion_dict[bq_schema[k]["type"]](v)
    return record


In [118]:
def _build_bq_schema_dict(schema):  # could move this to derived class but seems right to handle in base
        """
        Convert BigQuery schema as a list to BigQuery schema as a dictionary
        :param schema, list of BigQuery SchemaFields
        :return: schema_dict, dict. Dict of BigQuery schema fields.
            Dict key is field name
            Dict value is a dict also. It has BigQuery mode and type
        """
        schema_dict = {}
        for field in schema:
            f = field if isinstance(field, dict) else field.to_api_repr()
            schema_dict[f["name"]] = f
            if f.get("fields"):
                schema_dict[f["name"]]["fields"] = _build_bq_schema_dict(f["fields"])
            schema_dict[f["name"]].pop("description")
            schema_dict[f["name"]].pop("name")
        return schema_dict


In [119]:
with open("/Users/zar/Desktop/modern-data-stack/meltano_taps/tap-startup-alberta/tap_startup_alberta/schemas/companies.json") as f:
        companies_schema = json.load(f)
    

In [120]:
class RetriableAPIError(Exception):
    """Exception raised when a failed request can be safely retried."""



In [135]:

import requests
import json
import copy

import logging


from pathlib import Path
from datetime import datetime
from typing import Any, Dict, Optional, Union, List, Iterable, Callable
from singer.schema import Schema
from memoization import cached
import backoff

from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
from singer_sdk.authenticators import BasicAuthenticator
from singer_sdk.authenticators import APIAuthenticatorBase, SimpleAuthenticator
from singer_sdk.helpers.jsonpath import extract_jsonpath


class StartupStream():
    """startup-alberta stream class."""
    def __init__(
        self,
        schema: Optional[Union[Dict[str, Any], Schema]] = None
    ) -> None:
        
        """Initialize the REST stream.

        Args:
            tap: Singer Tap this stream belongs to.
            schema: JSON schema for records in this stream.
            name: Name of this stream.
            path: URL path for this entity stream.
        """
        self.schema = schema
        self.path = "/companies"
        self.config = {"token": TAP_STARTUP_ALBERTA_TOKEN, "app_id": TAP_STARTUP_ALBERTA_APP_ID}

        self._http_headers: dict = {}
        self._requests_session = requests.Session()
        self._compiled_jsonpath = None
        self._next_page_token_compiled_jsonpath = None
        
        

    @staticmethod
    def _url_encode(val: Union[str, datetime, bool, int, List[str]]) -> str:
        """Encode the val argument as url-compatible string.

        Args:
            val: TODO

        Returns:
            TODO
        """
        if isinstance(val, str):
            result = val.replace("/", "%2F")
        else:
            result = str(val)
        return result

    def get_url(self, context: Optional[dict]) -> str:
        """Get stream entity URL.

        Developers override this method to perform dynamic URL generation.

        Args:
            context: Stream partition or context dictionary.

        Returns:
            A URL, optionally targeted to a specific partition or context.
        """
        url = "".join([self.url_base, self.path or ""])
        vals = copy.copy(dict(self.config))
        vals.update(context or {})
        for k, v in vals.items():
            search_text = "".join(["{", k, "}"])
            if search_text in url:
                url = url.replace(search_text, self._url_encode(v))
        return url

    
    rest_method = "POST"

    url_base = "https://api.dealroom.co/api/v2"

    records_jsonpath = "$.items[*]"  # Or override `parse_response`.
    
    max_offset = 25
    # Private constants. May not be supported in future releases:
    _LOG_REQUEST_METRICS: bool = True
    # Disabled by default for safety:
    _LOG_REQUEST_METRIC_URLS: bool = False



    @property
    def http_headers(self) -> dict:
        """Return the http headers needed."""
        headers = {
            
            "authority": "api.dealroom.co",
            "content-type": "application/json",
            "origin": "https://ecosystem.startalberta.ca",
            "x-dealroom-app-id": TAP_STARTUP_ALBERTA_APP_ID,
            "x-requested-with": "XMLHttpRequest",
            "accept-encoding": "gzip, deflate, br"
            
        }

        return headers


    def get_url_params(
        self, context: Optional[dict], next_page_token: Optional[Any]
    ) -> Dict[str, Any]:

        """Return a dictionary of values to be used in URL parameterization."""
        params: dict = {}
        params["token"] = TAP_STARTUP_ALBERTA_TOKEN
        return params

    def prepare_request_payload(
        self, context: Optional[dict], next_page_token: Optional[Any]
    ) -> Optional[dict]:
        

        """Prepare the data payload for the REST API request.

        By default, no payload will be sent (return None).
        """
        payload = {
            "fields":"id,angellist_url,appstore_app_id,client_focus,company_status,core_side_value,corporate_industries,create_date,crunchbase_url,employee_12_months_growth_delta,employee_12_months_growth_percentile,employee_12_months_growth_relative,employee_12_months_growth_unique,employee_3_months_growth_delta,employee_3_months_growth_percentile,employee_3_months_growth_relative,employee_3_months_growth_unique,employee_6_months_growth_delta,employee_6_months_growth_percentile,employee_6_months_growth_relative,employee_6_months_growth_unique,employees_chart,employees_latest,employees,entity_sub_types,facebook_url,founders_score_cumulated,founders,founders_top_university,founders_top_past_companies,fundings,fundings,growth_stage,has_strong_founder,has_super_founder,has_promising_founder,hq_locations,images,income_streams,industries,innovations,innovations_count,investments,investors,is_editorial,is_ai_data,is_from_traderegister,latest_revenue_enhanced,latest_valuation_enhanced,launch_month,launch_year,linkedin_url,lists_ids,matching_score,name,participated_events,past_founders_raised_10m,past_founders,path,playmarket_app_id,revenues,sdgs,service_industries,similarweb_12_months_growth_delta,similarweb_12_months_growth_percentile,similarweb_12_months_growth_relative,similarweb_12_months_growth_unique,similarweb_3_months_growth_delta,similarweb_3_months_growth_percentile,similarweb_3_months_growth_relative,similarweb_3_months_growth_unique,similarweb_6_months_growth_delta,similarweb_6_months_growth_percentile,similarweb_6_months_growth_relative,similarweb_6_months_growth_unique,similarweb_chart,sub_industries,tags,tagline,technologies,total_funding,total_jobs_available,trading_multiples,type,tech_stack,twitter_url,job_roles",
            "limit":25,
            "offset": next_page_token,
            "form_data": 
             {"must":{"filters":{"all_slug_locations":{"values":["alberta_1"],"execution":"and"}},"execution":"and"},
              "should":{"filters":{}},
              "must_not":{"growth_stages":["mature"],"company_type":["service provider","government nonprofit"],"tags":["outside tech"],"company_status":["closed"]}
              },
         "sort":"-last_funding_date"}
        
        return payload
    
    def get_next_page_token(
        self, response: requests.Response, previous_token: Optional[Any],
    ) -> Optional[Any]:
        """Return token identifying next page or None if all records have been read.

        Args:
            response: A raw `requests.Response`_ object.
            previous_token: Previous pagination reference.

        Returns:
            Reference value to retrieve next page.

        .. _requests.Response:
            https://docs.python-requests.org/en/latest/api/#requests.Response
        """
        previous_token = json.loads(response.request.body.decode())["offset"] or 0
        total_items = response.json()['total']
        
        if previous_token < total_items:
            return previous_token + self.max_offset
        else:
            return None
        
        
    @property
    def requests_session(self) -> requests.Session:
        """Get requests session.

        Returns:
            The `requests.Session`_ object for HTTP requests.

        .. _requests.Session:
            https://docs.python-requests.org/en/latest/api/#request-sessions
        """
        if not self._requests_session:
            self._requests_session = requests.Session()
        return self._requests_session

    def validate_response(self, response: requests.Response) -> None:
        """Validate HTTP response.

        By default, checks for error status codes (>400) and raises a
        :class:`singer_sdk.exceptions.FatalAPIError`.

        Tap developers are encouraged to override this method if their APIs use HTTP
        status codes in non-conventional ways, or if they communicate errors
        differently (e.g. in the response body).

        .. image:: ../images/200.png


        In case an error is deemed transient and can be safely retried, then this
        method should raise an :class:`singer_sdk.exceptions.RetriableAPIError`.

        Args:
            response: A `requests.Response`_ object.

        Raises:
            FatalAPIError: If the request is not retriable.
            RetriableAPIError: If the request is retriable.

        .. _requests.Response:
            https://docs.python-requests.org/en/latest/api/#requests.Response
        """
        if 400 <= response.status_code < 500:
            msg = (
                f"{response.status_code} Client Error: "
                f"{response.reason} for path: {self.path}"
            )
            raise msg

        elif 500 <= response.status_code < 600:
            msg = (
                f"{response.status_code} Server Error: "
                f"{response.reason} for path: {self.path}"
            )
            raise msg

    def request_decorator(self, func: Callable) -> Callable:
        """Instantiate a decorator for handling request failures.

        Developers may override this method to provide custom backoff or retry
        handling.

        Args:
            func: Function to decorate.

        Returns:
            A decorated method.
        """
        decorator: Callable = backoff.on_exception(
            backoff.expo,
            (RetriableAPIError,),
            max_tries=5,
            factor=2,
        )(func)
        return decorator

    def _request(
        self, prepared_request: requests.PreparedRequest, context: Optional[dict]
    ) -> requests.Response:
        """TODO.

        Args:
            prepared_request: TODO
            context: Stream partition or context dictionary.

        Returns:
            TODO
        """
        response = self.requests_session.send(prepared_request)
        
        self.validate_response(response)
        logging.debug("Response received successfully.")
        return response
    
    def request_records(self, context: Optional[dict]) -> Iterable[dict]:
        """Request records from REST endpoint(s), returning response records.

        If pagination is detected, pages will be recursed automatically.

        Args:
            context: Stream partition or context dictionary.

        Yields:
            An item for every record in the response.

        Raises:
            RuntimeError: If a loop in pagination is detected. That is, when two
                consecutive pagination tokens are identical.
        """
        next_page_token: Any = None
        finished = False
        decorated_request = self.request_decorator(self._request)

        while not finished:
            prepared_request = self.prepare_request(
                context, next_page_token=next_page_token
            )
            resp = decorated_request(prepared_request, context)
            for row in self.parse_response(resp):
                yield row
            previous_token = copy.deepcopy(next_page_token)
            next_page_token = self.get_next_page_token(
                response=resp, previous_token=previous_token
            )
            if next_page_token and next_page_token == previous_token:
                raise RuntimeError(
                    f"Loop detected in pagination. "
                    f"Pagination token {next_page_token} is identical to prior token."
                )
            # Cycle until get_next_page_token() no longer returns a value
            finished = not next_page_token
            
    def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
        """Return a generator of row-type dictionary objects.

        Each row emitted should be a dictionary of property names to their values.

        Args:
            context: Stream partition or context dictionary.

        Yields:
            One item per (possibly processed) record in the API.
        """
        for record in self.request_records(context):
            transformed_record = self.post_process(record, context)
            if transformed_record is None:
                # Record filtered out during post_process()
                continue
            yield transformed_record
            
    def prepare_request(
        self, context: Optional[dict], next_page_token: Optional[Any]
    ) -> requests.PreparedRequest:
        """Prepare a request object.

        If partitioning is supported, the `context` object will contain the partition
        definitions. Pagination information can be parsed from `next_page_token` if
        `next_page_token` is not None.

        Args:
            context: Stream partition or context dictionary.
            next_page_token: Token, page number or any request argument to request the
                next page of data.

        Returns:
            Build a request with the stream's URL, path, query parameters,
            HTTP headers and authenticator.
        """
        http_method = self.rest_method
        url: str = self.get_url(context)
        params: dict = self.get_url_params(context, next_page_token)
        request_data = self.prepare_request_payload(context, next_page_token)
        headers = self.http_headers


        request = self.requests_session.prepare_request(
            requests.Request(
                method=http_method,
                url=url,
                params=params,
                headers=headers,
                json=request_data,
            ),
        )
        return request


    def parse_response(self, response: requests.Response) -> Iterable[dict]:
        """Parse the response and return an iterator of result rows."""
        # TODO: Parse response body and return a set of records.
        
        yield from extract_jsonpath(self.records_jsonpath, input=response.json())

    def post_process(self, row: dict, context: Optional[dict]) -> dict:
        """As needed, append or transform raw data to match expected structure."""
        row = flatten(row)
        return row

In [136]:
startup = StartupStream(schema=companies_schema)

In [137]:
records = startup.get_records(context={})

In [138]:
record = next(records)

In [139]:
record

{'id': 3019447,
 'angellist_url': None,
 'appstore_app_id': None,
 'client_focus_0_name': 'consumer',
 'company_status': 'operational',
 'core_side_value': None,
 'corporate_industries': [],
 'create_date': '2021-06-15T09:58:25+0100',
 'crunchbase_url': None,
 'employee_12_months_growth_delta': None,
 'employee_12_months_growth_percentile': None,
 'employee_12_months_growth_relative': None,
 'employee_12_months_growth_unique': None,
 'employee_3_months_growth_delta': None,
 'employee_3_months_growth_percentile': None,
 'employee_3_months_growth_relative': None,
 'employee_3_months_growth_unique': None,
 'employee_6_months_growth_delta': None,
 'employee_6_months_growth_percentile': None,
 'employee_6_months_growth_relative': None,
 'employee_6_months_growth_unique': None,
 'employees_chart_0_date': '2021-07-26',
 'employees_chart_0_value': 1,
 'employees_chart_1_date': '2021-10-14',
 'employees_chart_1_value': 1,
 'employees_latest': None,
 'employees': '2-10',
 'entity_sub_types': [],

In [140]:
nr = cleanup_record(companies_schema, record)

In [141]:
import pandas as pd
pd.DataFrame([nr])

Unnamed: 0,id,angellist_url,appstore_app_id,client_focus_0_name,company_status,core_side_value,corporate_industries,create_date,crunchbase_url,employee_12_months_growth_delta,...,technologies_3_name,technologies_4_id,technologies_4_name,total_funding,total_jobs_available,trading_multiples,type,tech_stack,twitter_url,job_roles
0,3019447,,,consumer,operational,,[],2021-06-15T09:58:25+0100,,,...,big data,2,artificial intelligence,0,0,,company,[],https://twitter.com/heyautohq,[]
