In [24]:
%autoawait asyncio

In [25]:
from google.colab import auth
from google.cloud import bigquery
from google.colab import data_table
import pandas as pd
import numpy as np
import asyncio
import aiohttp
import logging
from google.cloud import bigquery

project = 'data-warehouse-267920' # Project ID inserted based on the query results selected to explore
location = 'US' # Location inserted based on the query results selected to explore
client = bigquery.Client(project=project, location=location)
data_table.enable_dataframe_formatter()
auth.authenticate_user()

In [26]:
query = """
SELECT
  quote_id ,
  user_id ,
  zipcode zip_code,
  case when substr(driver_classification, 1,1) = "M" then "male" else "female" end as gender ,
  substr(driver_classification, 2,2) as age ,
  vin ,
  carYear ,
  case when inflection_score is not null then inflection_score
       when inflection_status = "Unavailable" and inflection_score is null then 400
       when inflection_status = "InsufficientData" and inflection_score is null then 525
       else null end as credit_score,
  case when vio_1_code != "0" then 1 else 0 end as vio_1,
  case when vio_2_code != "0" then 1 else 0 end as vio_2 ,
  case when acc_1_code != "0" then 1 else 0 end as acc_1 ,
  case when acc_2_code != "0" then 1 else 0 end as acc_2 ,
  "{bi}" as bi,
  "{pd}" as pd ,
  "{comp}" as comp,
  "{coll}" as coll ,
  "{marital_status}" as marital_status,
  "{days_lapsed}" as days_lapsed
FROM `data-warehouse-267920.pricing_hub.elasticity_quotes`
where inflection_status is not null and quote_created_at>= '2024-05-01' and quote_id not in (select quote_id from `price_comparison.competitor_prices_by_quote`)
qualify row_number() over (partition by user_id  order by quote_created_at desc) = 1
limit 20
"""

In [27]:
async def run_main():
    client = bigquery.Client("data-warehouse-267920")
    # query_job = client.query(query)
    # query_job = client.query(query.format(bi="25/50", pd="15000", comp="500", coll="500", marital_status="single", days_lapsed=0 ))
    # df = query_job.to_dataframe()

    # url = "http://127.0.0.1:8000/compare"
    url = "https://comparator-stg-axjyerdcyq-uc.a.run.app/compare"

    difference_list = []
    exception_list = []
    responses = []

    BI = "25/50"
    PD = "15000"
    CP_CL = [('500','500'), ('None', 'None')]
    MS = ['single', 'married']
    DAYS_LAPSED = [0]

    # calling the api
    async def fetch_comparison(session, body, url, row_index, user_id):
        try:
            async with session.post(url, json=body, timeout=360) as response:
                try:
                    response_json = await response.json()
                    response_json['index'] = row_index
                    # print(f'{user_id},{response_json},{body}')
                    response_json['user_id'] = user_id
                    responses.append(response_json)
                except TimeoutError as e:
                    exception_list.append((row_index, e))
                    # logger.error(f"Error for index {row_index}: {e}")
                except Exception as e:
                    try:
                        async with session.post(url, json=body, timeout=360) as response_2:
                            response_text = await response_2.text()
                    except TimeoutError as e:
                        exception_list.append((row_index, e))
                    except Exception as e:
                        exception_list.append((row_index, e))
        except TimeoutError as e:
            exception_list.append((row_index, e))
            # logger.error(f"Error for index {row_index}: {e}")
        except Exception as e:
            exception_list.append((row_index, e))

    # modifying data to fit api requirements
    async def run_sample():
        async with aiohttp.ClientSession() as session:
            tasks = []
            DF = pd.DataFrame()

            for cp, cl in CP_CL:
                for ms in MS:
                    for days in DAYS_LAPSED:
                        competitor_data = pd.DataFrame(columns=["quote_id", "bi_limits", "pd_limits", "comp_deductible", "coll_deductible", "days_lapsed", "marital_status", "carrier", "estimate"])

                        query_job = client.query(query.format(bi=BI, pd=PD, comp=cp, coll=cl, marital_status=ms, days_lapsed=days))
                        df = query_job.to_dataframe()
                        DF = pd.concat([df,DF])

                        for row in df.iterrows():
                            row = row[1]
                            # partial vin from vin with wildcard for check (1gykpdrs.r)
                            partial_vin = row["vin"][:8].upper() + '.' + row["vin"][9].upper()

                            # insurance_score
                            # 800
                            # 525
                            # 675
                            # 400

                            if row['credit_score'] >= 800:
                                row['credit_score'] = 800
                            elif row['credit_score'] >= 675:
                                row['credit_score'] = 675
                            elif row['credit_score'] >= 525:
                                row['credit_score'] = 525
                            else:
                                row['credit_score'] = 400

                            acv = np.minimum(2, row['vio_1'] + row['vio_2'] + row['acc_1'] + row['acc_2'])

                            payload = {
                                "state": "AZ",
                                "age": row["age"],
                                "gender": row["gender"].lower(),
                                "acv": str(acv),
                                "zip": str(row["zip_code"]),
                                "insurance_score": str(row["credit_score"]),
                                "pd_limit": str(PD),
                                "bi_limit": str(BI),
                                "comp_deductible": str(cp),
                                "coll_deductible": str(cl),
                                "marital_status": str(ms),
                                "days_lapsed": str(days),
                                "vehicle": {
                                    "partial_vin": partial_vin
                                }
                            }
                            tasks.append(fetch_comparison(session, payload, url, row.name, row['user_id']))
                print(payload)
            await asyncio.gather(*tasks)
            return DF
    DF = await run_sample()

    # appending new api data to base data and recording any errors
    # difference_list = pd.DataFrame([ {'index': r['index'], **rates} for r in responses for rates in r.get('rates'] ])
    competitor_prices_list = []
    errors = []

    print("Number of Responses: ", len(responses))

    print(f"Number of Users that encountered error in request:  {len(exception_list)}")
    from collections import Counter

    # Count exceptions by type
    exceptions_count = Counter(type(exc[1]).__name__ for exc in exception_list)

    # exceptions_count
    print(exceptions_count)

    for r in responses:
        try:
            for rates in r['rates']:
                rates['index'] = r['index']
                rates['upper_rate'] = rates['daily_rate']['upper_rate']
                rates['lower_rate'] = rates['daily_rate']['lower_rate']
                rates['rate'] = rates['daily_rate']['rate']
                competitor_prices_list.append(rates)
        except KeyError as e:
            # Something Wrong With Data Validation in Comparison API
            errors.append(
                {'msg': r['detail'][0]['msg']}
            )
            continue

    # putting new data and error data into dataframes

    competitor_prices_list = pd.DataFrame(competitor_prices_list)
    # print(competitor_prices_list)
    df_errors = pd.DataFrame(errors)

    competitor_prices_list.set_index('index', inplace=True)

    competitor_prices = competitor_prices_list[['carrier', 'rate', 'upper_rate', 'lower_rate']].rename(
        columns={'rate': 'estimate'})
    competitor_prices = competitor_prices.join(DF)

    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        write_disposition="WRITE_APPEND",
        create_disposition="CREATE_IF_NEEDED",
        schema_update_options="ALLOW_FIELD_ADDITION",
    )

    # TODO: Change the table id to the correct one
    TABLE_ID = "price_comparison.competitor_prices_by_quote"

    client = bigquery.Client("data-warehouse-267920")
    job = client.load_table_from_dataframe(
        competitor_prices,
        destination=TABLE_ID,
        job_config=job_config
    )

    job.result()

    return True


def main(request):
    asyncio.run(run_main())
    return "200 OK"

In [28]:
await run_main()

{'state': 'AZ', 'age': '87', 'gender': 'male', 'acv': '0', 'zip': '85225', 'insurance_score': '525', 'pd_limit': '15000', 'bi_limit': '25/50', 'comp_deductible': '500', 'coll_deductible': '500', 'marital_status': 'married', 'days_lapsed': '0', 'vehicle': {'partial_vin': 'JTDKDTB3.D'}}
{'state': 'AZ', 'age': '22', 'gender': 'female', 'acv': '0', 'zip': '85201', 'insurance_score': '525', 'pd_limit': '15000', 'bi_limit': '25/50', 'comp_deductible': 'None', 'coll_deductible': 'None', 'marital_status': 'married', 'days_lapsed': '0', 'vehicle': {'partial_vin': '3N1CN7AP.H'}}
Number of Responses:  80
Number of Users that encountered error in request:  0
Counter()


True