<a href="https://colab.research.google.com/github/stevejj4/Insurance-data-lifecycle/blob/main/data_ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
#!pip install google-cloud-bigquery pandas


In [16]:
from google.colab import auth
auth.authenticate_user()

from google.cloud import bigquery
import pandas as pd

project_id = 'river-messenger-430112-e1'

# Initializing BigQuery client
client = bigquery.Client(project=project_id)


In [17]:
import gspread
from google.auth import default

# Authenticate and create the gspread client
creds, _ = default()
gc = gspread.authorize(creds)

# Open the Google Sheets by title
worksheet_customers = gc.open('Insurance_data').worksheet('Customers')
worksheet_policies = gc.open('Insurance_data').worksheet('Policies')
worksheet_interactions = gc.open('Insurance_data').worksheet('Interactions')
worksheet_claims = gc.open('Insurance_data').worksheet('Claims')

# Load the data into pandas DataFrames
customers_df = pd.DataFrame(worksheet_customers.get_all_records())
policies_df = pd.DataFrame(worksheet_policies.get_all_records())
interactions_df = pd.DataFrame(worksheet_interactions.get_all_records())
claims_df = pd.DataFrame(worksheet_claims.get_all_records())


In [18]:
def upload_to_bigquery(df, table_name):
    dataset_id = 'Insurance_data'
    # Use the `TableReference` class to construct the table ID in the correct format
    table_ref = bigquery.TableReference(
        bigquery.DatasetReference(project_id, dataset_id), table_name
    )

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )

    job = client.load_table_from_dataframe(
        df, table_ref, job_config=job_config
    )

    job.result()  # Wait for the job to complete

    print(f"Loaded {job.output_rows} rows into {table_ref}.")

# Upload each DataFrame to BigQuery
upload_to_bigquery(customers_df, 'customers')
upload_to_bigquery(policies_df, 'policies')
upload_to_bigquery(interactions_df, 'interactions')
upload_to_bigquery(claims_df, 'claims')

Loaded 2004 rows into river-messenger-430112-e1.Insurance_data.customers.
Loaded 2004 rows into river-messenger-430112-e1.Insurance_data.policies.
Loaded 5000 rows into river-messenger-430112-e1.Insurance_data.interactions.
Loaded 3000 rows into river-messenger-430112-e1.Insurance_data.claims.


In [19]:
!pip install pyspark==3.1.2
!pip install google-cloud-bigquery
!pip install pandas-gbq




In [20]:
from pyspark.sql import SparkSession

# Initialize a SparkSession with BigQuery connector
spark = SparkSession.builder \
    .appName('BigQuerySparkApp') \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2') \
    .getOrCreate()

# Set Google Cloud project ID and dataset ID
# Make sure these values are correct and match the ones in your BigQuery project.
project_id = 'river-messenger-430112-e1'
dataset_id = 'Insurance_data'

# Define table names
# Double check that these table names exist in your BigQuery dataset.
customers_table = f"{project_id}.{dataset_id}.customers"
policies_table = f"{project_id}.{dataset_id}.policies"
interactions_table = f"{project_id}.{dataset_id}.interactions"
claims_table = f"{project_id}.{dataset_id}.claims"

# Read data from BigQuery into Spark DataFrames
# Verify that the table names are correct and exist in your BigQuery project.
df_customers = spark.read.format('bigquery').option('table', customers_table).load()
df_policies = spark.read.format('bigquery').option('table', policies_table).load()
df_interactions = spark.read.format('bigquery').option('table', interactions_table).load()
df_claims = spark.read.format('bigquery').option('table', claims_table).load()

# Print the schema of each DataFrame to check if the data was loaded correctly
print("Schema of df_customers:")
df_customers.printSchema()

print("\nSchema of df_policies:")
df_policies.printSchema()

print("\nSchema of df_interactions:")
df_interactions.printSchema()

print("\nSchema of df_claims:")
df_claims.printSchema()

Schema of df_customers:
root
 |-- CustomerID: long (nullable = true)
 |-- Age: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Region: string (nullable = true)


Schema of df_policies:
root
 |-- PolicyID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- PolicyType: string (nullable = true)
 |-- PolicyStartDate: string (nullable = true)
 |-- PolicyEndDate: string (nullable = true)
 |-- PremiumAmount: double (nullable = true)


Schema of df_interactions:
root
 |-- InteractionID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- InteractionDate: string (nullable = true)
 |-- InteractionType: string (nullable = true)
 |-- InteractionOutcome: string (nullable = true)


Schema of df_claims:
root
 |-- ClaimID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- ClaimDate: string (nullable = true)
 |-- ClaimAmount: double (nullable = true)
 |-- ClaimStatus: string (nullable = true)



In [24]:

query = """
SELECT c.CustomerID, c.Age, p.PolicyType, cl.ClaimAmount, cl.ClaimStatus
FROM `river-messenger-430112-e1.Insurance_data.customers` c
JOIN `river-messenger-430112-e1.Insurance_data.policies` p ON c.CustomerID = p.CustomerID
JOIN `river-messenger-430112-e1.Insurance_data.claims` cl ON c.CustomerID = cl.CustomerID;
"""

df = client.query(query).to_dataframe()

# Display the first few rows
df.head()


Unnamed: 0,CustomerID,Age,PolicyType,ClaimAmount,ClaimStatus
0,43,18,Life,536.1,Pending
1,373,18,Life,19014.72,Approved
2,373,18,Life,3056.66,Rejected
3,411,18,Health,1820.65,Approved
4,420,18,Life,12201.43,Rejected


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Example visualization: Distribution of Claim Amounts
sns.histplot(df['ClaimAmount'])
plt.title('Distribution of Claim Amounts')
plt.xlabel('Claim Amount')
plt.ylabel('Frequency')
plt.show()


In [25]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Check the available columns in your DataFrame
print(df.columns)

# Adjust the column names based on the output of df.columns
X = df[['Age', 'PolicyType']]  # Replace with actual column names from your DataFrame
y = df['ClaimAmount']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Preprocessing steps for numerical and categorical features
# Create a pipeline to handle the preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['Age']), # Standardize numerical features
        ('cat', OneHotEncoder(handle_unknown='ignore'), ['PolicyType']) # One-hot encode categorical features
    ])

# Create a pipeline that includes preprocessing and model fitting
pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                           ('model', LinearRegression())])

# Fit the pipeline (including preprocessing and model training)
pipeline.fit(X_train, y_train)

# Predict and evaluate
y_pred = pipeline.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error: {mse}')


Index(['CustomerID', 'Age', 'PolicyType', 'ClaimAmount', 'ClaimStatus'], dtype='object')
Mean Squared Error: 31077183.753261
