# Migrating from a legacy Hadoop enviroment to Google Data Cloud

This notebook will guide you through the process of migrating from a legacy Hadoop enviroment to Google Data Cloud.
It contains the following steps:
1. Deploying a legacy Cloudera cluster.
2. Migrate data from the legacy Hadoop enviroment to BigQuery.
3. Migrate HIVE workloads to BigQuery
4. Migrate PySpark workloads to BigQuery Spark.
5. Enable BigQuery Data Governance (data quality, data profiling and data insights.
6. Run an BigQuery Canvas analysis.



Disable the following org policies:
- `constraints/compute.vmExternalIpAccess`
- `constraints/compute.requireShieldedVm`
- `constraints/iam.disableServiceAccountKeyCreation`

## Step 1: Deploying a legacy Cloudera cluster on Google Cloud.

From a Cloud Shell, clone the TI25-LAB01 repository and run the deploy-cloudera.sh script specifying the project id, region and zone.
For example:
```bash
git clone https://github.com/velascoluis/ti25-lab01.git
cd ti25-lab01/src/lab_01/cdh-deployment
./deploy-cloudera.sh velascoluis-dev-sandbox us-central1 us-central1-a
```

Once deployed connect to the Cloudera container, for example:
```bash
gcloud compute ssh gce-cdh-5-single-node --zone=us-central1-a --container 39334b3d7c28
```


```bash
$> hive
hive> use ccf_db;
hive> show tables;
hive> select * from customers;
```

Now, run a more complex HIVE query:

```sql
hive> WITH CustomerLoanSummary AS (
    -- Summarize loan applications per customer
    SELECT 
        c.customer_id,
        c.first_name,
        c.last_name,
        c.life_event,
        COUNT(la.application_id) as total_applications,
        SUM(CASE WHEN la.application_status = 'APPROVED' THEN 1 ELSE 0 END) as approved_applications,
        SUM(la.loan_amount) as total_loan_amount,
        SUM(la.marketing_cost) as total_marketing_cost,
        AVG(la.loan_amount) as avg_loan_amount
    FROM customers c
    LEFT JOIN loan_applications la ON c.customer_id = la.customer_id
    GROUP BY c.customer_id, c.first_name, c.last_name, c.life_event
),
RepaymentMetrics AS (
    -- Calculate repayment performance metrics
    SELECT 
        la.customer_id,
        COUNT(DISTINCT lr.repayment_id) as total_repayments,
        SUM(lr.amount_paid) as total_amount_paid,
        AVG(lr.days_past_due) as avg_days_past_due,
        SUM(CASE WHEN lr.payment_status = 'LATE' THEN 1 ELSE 0 END) as late_payments
    FROM loan_applications la
    JOIN loan_repayments lr ON la.application_id = lr.loan_id
    GROUP BY la.customer_id
)

SELECT 
    cls.*,
    rm.total_repayments,
    rm.total_amount_paid,
    rm.avg_days_past_due,
    rm.late_payments,
    ROUND(CASE 
        WHEN cls.total_loan_amount = 0 THEN NULL 
        ELSE (cls.total_marketing_cost / cls.total_loan_amount) * 100 
    END, 2) as marketing_cost_percentage,
    ROUND(CASE 
        WHEN rm.total_repayments = 0 THEN NULL 
        ELSE (rm.late_payments / rm.total_repayments) * 100 
    END, 2) as late_payment_percentage,
    CASE 
        WHEN rm.avg_days_past_due = 0 THEN 'Excellent'
        WHEN rm.avg_days_past_due <= 30 THEN 'Good'
        WHEN rm.avg_days_past_due <= 90 THEN 'Fair'
        ELSE 'Poor'
    END as customer_rating
FROM CustomerLoanSummary cls
LEFT JOIN RepaymentMetrics rm ON cls.customer_id = rm.customer_id
WHERE cls.total_applications > 0
ORDER BY cls.total_loan_amount DESC, rm.avg_days_past_due ASC;
```

![HIVE query](./assets/hive.gif)

Now, run the PySpark code to calculate the delinquency rate:
```bash
cd /home/cloudera
spark-submit del_rate_calculation.py
```

![PySpark code](./assets/spark.gif)

## Step 2: Migrate data from the legacy Hadoop enviroment to BigQuery.

Still from the Cloudera container, run the following commands, adapt the bucket name to your project.
First, ensure you have vibility to the Google Cloud Storage bucket:
```bash
export BUCKET_NAME=velascoluis-dev-sandbox-cloudera-1737643339
hadoop fs -ls  gs://${BUCKET_NAME}
```
Then copy the data from the Hadoop enviroment to the BigQuery bucket:

```bash
export BUCKET_NAME=velascoluis-dev-sandbox-cloudera-1737643339
for TABLE_NAME in customers loan_applications loan_repayments; do
  echo "Processing table: ${TABLE_NAME}"
  FILES_LOCATION=`hive --database ${DATABASE_NAME} -S -e "describe formatted ${TABLE_NAME} ;" | grep 'Location' | awk '{ print $NF }'`
  echo "Source location: ${FILES_LOCATION}"
  echo "Copying to GCS bucket: gs://${BUCKET_NAME}/${TABLE_NAME}/${TABLE_NAME}"
  hadoop distcp -overwrite -delete ${FILES_LOCATION}/* gs://${BUCKET_NAME}/${TABLE_NAME}/${TABLE_NAME}
done
```


Now, create the BigQuery tables from BigQuery Studio:
```sql
CREATE SCHEMA IF NOT EXISTS ccf_db;
--BigLake table
CREATE OR REPLACE EXTERNAL TABLE `ccf_db.customers`
 WITH CONNECTION `projects/velascoluis-dev-sandbox/locations/us/connections/biglake-connection`
 OPTIONS (
    format ="PARQUET",
    uris = ['gs://velascoluis-dev-sandbox-cloudera-1737643339/customers/*']);

CREATE OR REPLACE EXTERNAL TABLE `ccf_db.loan_applications`
 WITH CONNECTION `projects/velascoluis-dev-sandbox/locations/us/connections/biglake-connection`
 OPTIONS (
    format ="PARQUET",
    uris = ['gs://velascoluis-dev-sandbox-cloudera-1737643339/loan_applications/*']);    

CREATE OR REPLACE EXTERNAL TABLE `ccf_db.loan_repayments`
 WITH CONNECTION `projects/velascoluis-dev-sandbox/locations/us/connections/biglake-connection`
 OPTIONS (
    format ="PARQUET",
    uris = ['gs://velascoluis-dev-sandbox-cloudera-1737643339/loan_repayments/*']);


CREATE OR REPLACE TABLE `ccf_db.customers_iceberg`
(
  customer_id STRING,	
  first_name STRING,	
  last_name STRING,
  date_of_birth STRING,
  email STRING,
  phone STRING,
  registration_date STRING,
  life_event STRING
)
 WITH CONNECTION `projects/velascoluis-dev-sandbox/locations/us/connections/biglake-connection`
 OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'gs://velascoluis-dev-sandbox-cloudera-1737643339/warehouse/customers/');

INSERT INTO `ccf_db.customers_iceberg`
SELECT * FROM `ccf_db.customers`;


CREATE OR REPLACE TABLE `ccf_db.loan_applications_iceberg`
(
  application_id STRING,	
  customer_id STRING,	
  application_date STRING,
  product_type STRING,
  loan_amount NUMERIC,
  application_status STRING,
  application_channel STRING,
  marketing_cost NUMERIC,
  approval_date STRING,
  disbursement_date STRING
)
 WITH CONNECTION `projects/velascoluis-dev-sandbox/locations/us/connections/biglake-connection`
 OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'gs://velascoluis-dev-sandbox-cloudera-1737643339/warehouse/loan_applications/');

INSERT INTO `ccf_db.loan_applications_iceberg`
SELECT * FROM `ccf_db.loan_applications`;

CREATE OR REPLACE TABLE `ccf_db.loan_repayments_iceberg`
(
  repayment_id STRING,
  loan_id STRING,
  repayment_date STRING,
  amount_due NUMERIC,
  amount_paid NUMERIC,
  payment_status STRING,
  days_past_due INTEGER
)
 WITH CONNECTION `projects/velascoluis-dev-sandbox/locations/us/connections/biglake-connection`
 OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'gs://velascoluis-dev-sandbox-cloudera-1737643339/warehouse/loan_repayments/');

INSERT INTO `ccf_db.loan_repayments_iceberg`
SELECT * FROM `ccf_db.loan_repayments`;
```


## Step 3: Migrate HIVE workloads to BigQuery

- In BigQuery enable SQL translation and translate from HQL to Google SQL.
- Then explain the change with Generative AI.
- Adapt the table names to BigQuery adding the dataset name using Generative AI.
- Materialize the results on a BigQuery table adding: `CREATE OR REPLACE TABLE ccf_db.hive_analysis AS ..` 

![HIVE query](./assets/hive.gif)

## Step 4: Migrate PySpark workloads to BigQuery Spark.

From the BigQuery Studio, create a new python notebook and run the following code:

Refactor pySpark code as:
```python
from dataproc_spark_session.session.spark.connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as F
from pyspark.sql.window import Window

project = "velascoluis-dev-sandbox" # @param {type:"string"}
location = "us-central1" # @param {type:"string"}

session = Session()
spark = (
    DataprocSparkSession.builder
      .appName("sparkSession")
      .remote()
      .dataprocConfig(session)
      .getOrCreate()
)
# Load data from BigQuery tables
loan_apps_df = spark.read.format("bigquery").option("table", "ccf_db.loan_applications").load()
repayments_df = spark.read.format("bigquery").option("table", "ccf_db.loan_repayments").load()
customers_df = spark.read.format("bigquery").option("table", "ccf_db.customers").load()


print("\nNumber of customers: " + str(customers_df.count()))
print("Number of loan applications: " + str(loan_apps_df.count()))
print("Number of repayments: " + str(repayments_df.count()) + "\n")
print("Schema for loan_apps_df:")
loan_apps_df.printSchema()
print()
print("Schema for repayments_df:")
repayments_df.printSchema()
print()
print("Schema for customers_df:")
customers_df.printSchema()


approved_loans = loan_apps_df.filter("application_status = 'Approved'")


repayments_alias = repayments_df.alias("repayments")
approved_loans_alias = approved_loans.alias("approved_loans")
customers_alias = customers_df.alias("customers")


loan_repayments = (
    repayments_alias
    .join(
        approved_loans_alias,
        repayments_alias.loan_id == approved_loans_alias.application_id,
        how="inner"
    )
    .join(
        customers_alias,
        approved_loans_alias.customer_id == customers_alias.customer_id,
        how="inner"
    )
    .select(
        F.col("repayments.repayment_id"),
        F.col("repayments.loan_id"),
        F.col("repayments.repayment_date"),
        F.col("repayments.amount_due"),
        F.col("repayments.amount_paid"),
        F.col("repayments.payment_status"),
        F.col("repayments.days_past_due"),
        F.col("approved_loans.product_type"),
        F.col("customers.first_name"),
        F.col("customers.last_name"),
        F.col("customers.customer_id").alias("customer_id_customers")
    )
)

delinquent_loans = loan_repayments.withColumn(
    "delinquency_bucket",
    F.when(F.col("days_past_due") == 0, "Current")
    .when(F.col("days_past_due").between(1, 29), "1-29 Days")
    .when(F.col("days_past_due").between(30, 59), "30-59 Days")
    .when(F.col("days_past_due").between(60, 89), "60-89 Days")
    .when(F.col("days_past_due") >= 90, "90+ Days")
    .otherwise("Current")
)

product_totals = delinquent_loans.groupBy("product_type").agg(
    F.count("*").alias("total_payments")
)

delinquency_counts = delinquent_loans.groupBy(
    "product_type", "delinquency_bucket"
).agg(
    F.count("*").alias("count")
)

delinquency_rates = delinquency_counts.join(
    product_totals, "product_type"
).withColumn(
    "delinquency_rate",
    F.round((F.col("count") / F.col("total_payments")) * 100, 2)
)

window_spec = Window.partitionBy("product_type", "delinquency_bucket").orderBy(
    F.desc("days_past_due")
)

top_delinquents = (
    delinquent_loans
    .filter(F.col("days_past_due") > 0)
    .select(
        "product_type",
        "delinquency_bucket",
        "days_past_due",
        "first_name",
        "last_name",
        "customer_id_customers"
    )
    .withColumn("rank", F.row_number().over(window_spec))
    .filter(F.col("rank") <= 3)
    .withColumn(
        "customer_name",
        F.concat(F.col("first_name"), F.lit(" "), F.col("last_name"))
    )
)

results = delinquency_rates.orderBy("product_type", "delinquency_bucket").collect()

csv_data = []

for row in results:
    base_row = {
        "product_type": row.product_type,
        "delinquency_bucket": row.delinquency_bucket,
        "delinquency_rate": row.delinquency_rate,
    }
    if row.delinquency_bucket != "Current":
        matching_customers = top_delinquents.filter(
            (F.col("product_type") == row.product_type) &
            (F.col("delinquency_bucket") == row.delinquency_bucket)
        ).collect()
        for i, cust in enumerate(matching_customers, 1):
            cust_num = f"customer_{i}_"
            base_row[cust_num + "name"] = cust.customer_name
            base_row[cust_num + "id"] = cust.customer_id_customers
            base_row[cust_num + "days_past_due"] = cust.days_past_due
    csv_data.append(base_row)

print("\nDelinquency Rates by Product Type:")
print("===================================")
for row in csv_data:
    print(row)
delinquency_rates.write.format("bigquery").option("writeMethod", "direct").mode("overwrite").save('ccf_db.deliquency_analysis')    
```


## Step 5: Enable BigQuery Data Governance

From the BigQuery Studio, navigate to the table `ccf_db.deliquency_analysis` and enable the following data governance features:

- Data profile
- Data quality
- Metadata curation
