# Data Analysis
The notebook focuses on connecting to GCP to access the data from the bucket.

To make the connection, we use a [GCP service account ](https://cloud.google.com/iam/docs/service-account-overview)that holds permissions to access our bucket.
### Steps
1. Access Service accounts in the GCP account
2. Open `deng-capstone-service-account`
3. Create a new key file and download it locally for access in the next step. Rename file to a concise name.
4. Set path of the key file as option in your spark configuration -    `spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile","/path/to/file/<renamed>.json")`

### Resources
1. https://gobiviswa.medium.com/google-cloud-storage-handson-connecting-using-pyspark-5eefc0d8d932
2. https://cloud.google.com/iam/docs/service-account-overview


## Spark Application setup

In [3]:
!pip install google-cloud-storage




In [4]:
from pyspark import SparkConf
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName('data-engineering-capstone') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# Set GCS credentials. Ensure path points to you downloaded key file
spark._jsc.hadoopConfiguration().set(
    "google.cloud.auth.service.account.json.keyfile",
    "C:\Pkey\key.json")



## Read from GCS

In [5]:
# file path to data in GCS bucket

file_path = "gs://ecommerce-customer-bucket/e-commerce-customer-behavior.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)

Py4JJavaError: An error occurred while calling o43.csv.
: java.io.IOException: Error accessing gs://ecommerce-customer-bucket/e-commerce-customer-behavior.csv. reason=403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/ecommerce-customer-bucket/o/e-commerce-customer-behavior.csv?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "location" : "Authorization",
    "locationType" : "header",
    "message" : "The billing account for the owning project is disabled in state delinquent",
    "reason" : "accountDisabled"
  } ],
  "message" : "The billing account for the owning project is disabled in state delinquent"
}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2336)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:2225)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1243)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1217)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.lambda$getFileStatus$10(GoogleHadoopFileSystemBase.java:1081)
	at com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics.trackDuration(GhfsStorageStatistics.java:104)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1070)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:756)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/ecommerce-customer-bucket/o/e-commerce-customer-behavior.csv?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "location" : "Authorization",
    "locationType" : "header",
    "message" : "The billing account for the owning project is disabled in state delinquent",
    "reason" : "accountDisabled"
  } ],
  "message" : "The billing account for the owning project is disabled in state delinquent"
}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2328)
	... 23 more


In [3]:
# Describe the dataset
df.describe().show()


+-------+------------------+------+------------------+-------------+---------------+-----------------+-----------------+------------------+------------------------+------------------+
|summary|       Customer ID|Gender|               Age|         City|Membership Type|      Total Spend|  Items Purchased|    Average Rating|Days Since Last Purchase|Satisfaction Level|
+-------+------------------+------+------------------+-------------+---------------+-----------------+-----------------+------------------+------------------------+------------------+
|  count|               350|   350|               350|          350|            350|              350|              350|               350|                     350|               348|
|   mean|             275.5|  NULL|33.597142857142856|         NULL|           NULL|845.3817142857134|             12.6| 4.019142857142849|      26.588571428571427|              NULL|
| stddev|101.18053172424031|  NULL| 4.870882183548376|         NULL|           N

In [4]:
# Drop all rows that contain any null values in any column
df = df.dropna()


In [5]:
df = df.dropDuplicates()


In [6]:
df.describe().show()

+-------+------------------+------+------------------+-------------+---------------+-----------------+------------------+------------------+------------------------+------------------+
|summary|       Customer ID|Gender|               Age|         City|Membership Type|      Total Spend|   Items Purchased|    Average Rating|Days Since Last Purchase|Satisfaction Level|
+-------+------------------+------+------------------+-------------+---------------+-----------------+------------------+------------------+------------------------+------------------+
|  count|               348|   348|               348|          348|            348|              348|               348|               348|                     348|               348|
|   mean|275.88793103448273|  NULL|33.577586206896555|         NULL|           NULL|847.7931034482757|12.632183908045977| 4.023563218390801|      26.614942528735632|              NULL|
| stddev|101.30461140521022|  NULL|4.8780237566708236|         NULL|       

In [7]:
# Segmentation based on spending habits, demographics, loyalty, and satisfaction
from pyspark.sql.functions import when

segmentation_df = df.withColumn("SpendingCategory", 
                                when(df["Total Spend"] > 1000, "High")
                                .when(df["Total Spend"] > 500, "Medium")
                                .otherwise("Low"))

segmentation_df.show()


+-----------+------+---+-------------+---------------+-----------+---------------+--------------+----------------+------------------------+------------------+----------------+
|Customer ID|Gender|Age|         City|Membership Type|Total Spend|Items Purchased|Average Rating|Discount Applied|Days Since Last Purchase|Satisfaction Level|SpendingCategory|
+-----------+------+---+-------------+---------------+-----------+---------------+--------------+----------------+------------------------+------------------+----------------+
|        251|Female| 30|     New York|           Gold|     1180.8|             16|           4.7|            true|                      19|         Satisfied|            High|
|        151|Female| 43|      Chicago|         Bronze|     505.75|             10|           3.3|            true|                      39|       Unsatisfied|          Medium|
|        324|Female| 43|      Chicago|         Bronze|     505.75|             10|           3.3|            true|      

In [8]:
from pyspark.sql.functions import current_date, datediff

# Identify inactive customers (no purchase in the last 30 days)
inactive_customers = segmentation_df.filter((segmentation_df["Days Since Last Purchase"]) > 30)

# Identify recent customers (purchased within the last 7 days)
recent_customers = segmentation_df.filter((segmentation_df["Days Since Last Purchase"]) <= 30)




In [9]:
inactive_customers.show()

+-----------+------+---+--------+---------------+-----------+---------------+--------------+----------------+------------------------+------------------+----------------+
|Customer ID|Gender|Age|    City|Membership Type|Total Spend|Items Purchased|Average Rating|Discount Applied|Days Since Last Purchase|Satisfaction Level|SpendingCategory|
+-----------+------+---+--------+---------------+-----------+---------------+--------------+----------------+------------------------+------------------+----------------+
|        151|Female| 43| Chicago|         Bronze|     505.75|             10|           3.3|            true|                      39|       Unsatisfied|          Medium|
|        324|Female| 43| Chicago|         Bronze|     505.75|             10|           3.3|            true|                      39|       Unsatisfied|          Medium|
|        235|Female| 41| Chicago|         Bronze|     480.25|              9|           3.6|            true|                      38|       Unsa

In [10]:
recent_customers.show()

+-----------+------+---+-------------+---------------+-----------+---------------+--------------+----------------+------------------------+------------------+----------------+
|Customer ID|Gender|Age|         City|Membership Type|Total Spend|Items Purchased|Average Rating|Discount Applied|Days Since Last Purchase|Satisfaction Level|SpendingCategory|
+-----------+------+---+-------------+---------------+-----------+---------------+--------------+----------------+------------------------+------------------+----------------+
|        251|Female| 30|     New York|           Gold|     1180.8|             16|           4.7|            true|                      19|         Satisfied|            High|
|        144|  Male| 35|  Los Angeles|         Silver|      820.9|             12|           4.3|           false|                      12|           Neutral|          Medium|
|        385|  Male| 28|San Francisco|           Gold|     1490.1|             21|           4.9|           false|      

In [11]:
# Write the results to GCS
output_path = "gs://project_imad/output/inactive_customers"
inactive_customers.write.mode("overwrite").csv(output_path, header=True)

output_path_recent = "gs://project_imad/output/recent_customers"
recent_customers.write.mode("overwrite").csv(output_path_recent, header=True)
