<a href="https://colab.research.google.com/github/stevejj4/Machine-Learning/blob/main/Insurance_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#!pip install google-cloud-bigquery pandas


In [2]:
from google.colab import auth
auth.authenticate_user()

from google.cloud import bigquery
import pandas as pd

# Replace with your project ID
project_id = 'river-messenger-430112-e1'

# Initialize BigQuery client
client = bigquery.Client(project=project_id)


In [3]:
import gspread
from google.auth import default

# Authenticate and create the gspread client
creds, _ = default()
gc = gspread.authorize(creds)

# Open the Google Sheets by title
worksheet_customers = gc.open('Insurance_data').worksheet('Customers')
worksheet_policies = gc.open('Insurance_data').worksheet('Policies')
worksheet_interactions = gc.open('Insurance_data').worksheet('Interactions')
worksheet_claims = gc.open('Insurance_data').worksheet('Claims')

# Load the data into pandas DataFrames
customers_df = pd.DataFrame(worksheet_customers.get_all_records())
policies_df = pd.DataFrame(worksheet_policies.get_all_records())
interactions_df = pd.DataFrame(worksheet_interactions.get_all_records())
claims_df = pd.DataFrame(worksheet_claims.get_all_records())


In [4]:
def upload_to_bigquery(df, table_name):
    dataset_id = 'Insurance_data'  # Correct dataset ID
    # Use the `TableReference` class to construct the table ID in the correct format
    table_ref = bigquery.TableReference(
        bigquery.DatasetReference(project_id, dataset_id), table_name
    )

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )

    job = client.load_table_from_dataframe(
        df, table_ref, job_config=job_config
    )

    job.result()  # Wait for the job to complete

    print(f"Loaded {job.output_rows} rows into {table_ref}.")

# Upload each DataFrame to BigQuery
upload_to_bigquery(customers_df, 'customers')
upload_to_bigquery(policies_df, 'policies')
upload_to_bigquery(interactions_df, 'interactions')
upload_to_bigquery(claims_df, 'claims')

Loaded 2004 rows into river-messenger-430112-e1.Insurance_data.customers.
Loaded 2004 rows into river-messenger-430112-e1.Insurance_data.policies.
Loaded 5000 rows into river-messenger-430112-e1.Insurance_data.interactions.
Loaded 3000 rows into river-messenger-430112-e1.Insurance_data.claims.


In [5]:
!pip install pyspark==3.1.2
!pip install google-cloud-bigquery
!pip install pandas-gbq




In [22]:
from pyspark.sql import SparkSession

# Initialize a SparkSession with BigQuery connector
spark = SparkSession.builder \
    .appName('BigQuerySparkApp') \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2') \
    .getOrCreate()

# Set Google Cloud project ID and dataset ID
# Make sure these values are correct and match the ones in your BigQuery project.
project_id = 'river-messenger-430112-e1'
dataset_id = 'Insurance_data'

# Define table names
# Double check that these table names exist in your BigQuery dataset.
customers_table = f"{project_id}.{dataset_id}.customers"
policies_table = f"{project_id}.{dataset_id}.policies"
interactions_table = f"{project_id}.{dataset_id}.interactions"
claims_table = f"{project_id}.{dataset_id}.claims"

# Read data from BigQuery into Spark DataFrames
# Verify that the table names are correct and exist in your BigQuery project.
df_customers = spark.read.format('bigquery').option('table', customers_table).load()
df_policies = spark.read.format('bigquery').option('table', policies_table).load()
df_interactions = spark.read.format('bigquery').option('table', interactions_table).load()
df_claims = spark.read.format('bigquery').option('table', claims_table).load()

# Print the schema of each DataFrame to check if the data was loaded correctly
print("Schema of df_customers:")
df_customers.printSchema()

print("\nSchema of df_policies:")
df_policies.printSchema()

print("\nSchema of df_interactions:")
df_interactions.printSchema()

print("\nSchema of df_claims:")
df_claims.printSchema()

Schema of df_customers:
root
 |-- CustomerID: long (nullable = true)
 |-- Age: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Region: string (nullable = true)


Schema of df_policies:
root
 |-- PolicyID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- PolicyType: string (nullable = true)
 |-- PolicyStartDate: string (nullable = true)
 |-- PolicyEndDate: string (nullable = true)
 |-- PremiumAmount: double (nullable = true)


Schema of df_interactions:
root
 |-- InteractionID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- InteractionDate: string (nullable = true)
 |-- InteractionType: string (nullable = true)
 |-- InteractionOutcome: string (nullable = true)


Schema of df_claims:
root
 |-- ClaimID: long (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- ClaimDate: string (nullable = true)
 |-- ClaimAmount: double (nullable = true)
 |-- ClaimStatus: string (nullable = true)



In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd

# Example data preparation
# Assume df_customers is a Spark DataFrame; you need to convert it to a Pandas DataFrame first
df_customers_pd = df_customers.toPandas()

# Example feature and target selection
features = df_customers_pd[['feature1', 'feature2']].values
targets = df_customers_pd['target'].values

# Convert to PyTorch tensors
features_tensor = torch.tensor(features, dtype=torch.float32)
targets_tensor = torch.tensor(targets, dtype=torch.float32)

# Define a simple model
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(2, 50)  # Adjust input size based on your features
        self.fc2 = nn.Linear(50, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Create model, define loss function and optimizer
model = SimpleNN()
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# Training loop
for epoch in range(100):
    optimizer.zero_grad()
    outputs = model(features_tensor)
    loss = criterion(outputs.squeeze(), targets_tensor)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}: Loss = {loss.item()}")

# Save the model
torch.save(model.state_dict(), 'model.pth')


Py4JJavaError: An error occurred while calling o329.collectToPython.
: com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request.
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:47)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
	at com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
	at com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
	at com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
	at com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
	at com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:564)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:232)
		at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:141)
		at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:332)
		at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:365)
		at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:420)
		at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:364)
		at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:332)
		at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
		at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
		at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
		at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
		at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
		at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
		at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
		at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
		at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
		at scala.collection.Iterator.foreach(Iterator.scala:941)
		at scala.collection.Iterator.foreach$(Iterator.scala:941)
		at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
		at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
		at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
		at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
		at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
		at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
		at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
		at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
		at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
		at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:391)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:104)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
		at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:104)
		at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:97)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:117)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
		at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:117)
		at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161)
		at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
		at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
		at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206)
		at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
		at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
		at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.base/java.lang.reflect.Method.invoke(Method.java:566)
		at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
		at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
		at py4j.Gateway.invoke(Gateway.java:282)
		at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
		at py4j.commands.CallCommand.execute(CallCommand.java:79)
		at py4j.GatewayConnection.run(GatewayConnection.java:238)
		... 1 more
Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request.
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.Status.asRuntimeException(Status.java:535)
	... 13 more
