# Задание
    конфигурируем окружение
    создаем Dataproc (Spark-кластер)
    загружаем датасет (Credit Card Default)
    копируем и анализируем данные в BigQuery
    выгружаем в GCS в формате .parquet при помощи Dataflow, используя шаблон
    загружаем данные при помощи Spark
    работаем с данными через Spark SQL в Jupyter
    Бонус:
    работа с данными через Pandas и визуализация
    В документе более подробное описание задания и комментарии по выполнению.

    Вопросы для самопроверки:

    Вывести количество верно спрогнозированных просрочек
    Вывести медиану кредитного лимита в зависимости от возраста клиента
    Ответы на эти вопросы (в виде выгрузки или скриншотов) нужно вложить в тред.
    Для ответа на эти вопросы потребуется сформировать SQL-запрос(ы) и выполнить его в BigQuery и в Jupyter (Spark SQL), соответственно


In [18]:
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer

# this is the path to the key file which you get from the GCP 
key_path = "C:\\Users\\peaceful-parity-336514-f361713c806a.json"
print(key_path)

credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

C:\Users\peaceful-parity-336514-f361713c806a.json


In [19]:
#Setup the GCP client. 
#Create the object of the client credentials and project_id comes from the key file 
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

In [22]:
# Create Dataset
dataset_name = "GCP_lab_credit_card_dataset"
dataset_id = "{}.GCP_lab_credit_card_dataset".format(client.project, dataset_name)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_id)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"

# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

Created dataset peaceful-parity-336514.GCP_lab_credit_card_dataset


In [10]:
print(dataset.dataset_id)

GCP_lab_credit_card_dataset


In [23]:
#Copy the table into the my project my dataset my table 
# Construct a BigQuery client object.
# TODO(developer): Set table_id to the ID of the destination table.
source_table_path = "bigquery-public-data.ml_datasets.credit_card_default"
table_name = "Credit_card_copy"

table_id = "{}.{}.{}".format(client.project,dataset.dataset_id,table_name)
job_config = bigquery.QueryJobConfig(destination=table_id)

sql = "SELECT * FROM `{}`;".format(source_table_path)

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

print("Query results loaded to the table {}".format(table_id))

Query results loaded to the table peaceful-parity-336514.GCP_lab_credit_card_dataset.Credit_card_copy


In [39]:
# Query the dataset table 
query_0 = """
SELECT id, default_payment_next_month, score.tables.score, score.tables.value 
FROM `{}`, unnest(predicted_default_payment_next_month) as score
where (score.tables.score > 0.5);
""".format(table_id)

query_1 = """
SELECT id, default_payment_next_month, score.tables.score, score.tables.value 
FROM `{}`, unnest(predicted_default_payment_next_month) as score
where (score.tables.score > 0.5 and default_payment_next_month = '1');
""".format(table_id)

query_0_job = client.query(query_0)  # Make an API request.
query_1_job = client.query(query_1)  # Make an API request.

result_0 = query_0_job.result()
result_1 = query_1_job.result()
if result_1.total_rows == 0:
    print("The Model accuracy is: 100%")
else:
    Accuracy = (1 - result_1.total_rows/result_0.total_rows) * 100
    print("The Model accuracy is: {}%".format(Accuracy))


The Model accuracy is: 78.58347386172007%


Using the GCP Console:   
>>    Job name:   cloudlab1  
>>    Job region: us-central1  
>>    goog-dataflow-provided-template-name: bigquery_to_parquet  
>>    goog-dataflow-provided-template-type: flex  
>>    tableRef: peaceful-parity-336514:GCP_lab_credit_card_dataset.Credit_card_copy  
>>    bucket: gs://ml_model_lab/lab_1  

In [43]:
# Check the result of the Dataflow
from google.cloud import storage

bucket_name = "ml_model_lab"
storage_client = storage.Client(credentials=credentials, project=credentials.project_id)
blobs = storage_client.list_blobs(bucket_name)
for blob in blobs:
    print(blob.name)


lab_1/output-00000-of-00001.parquet
ml_lab//model.bst
ml_lab/model.bst
model.bst


# Create dataproc cluster
gcloud dataproc clusters create cluster-cloudlab --autoscaling-policy policy-dataproc --enable-component-gateway --region us-central1 --zone us-central1-f --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --image-version 2.0-debian10 --optional-components JUPYTER --project peaceful-parity-336514

In [None]:
!scala -version

In [None]:
from pyspark.sql import SparkSession

#create spark session 
spark = SparkSession.builder \
    .appName("Final_Lab") \
    .getOrCreate()

#load the datastore (dataset)
datastore = "gs://ml_model_lab/lab_1"
gs_df = spark.read.format("parquet").load(datastore)

#show the first 10 records of data 
gs_df.show(10)

#print Schema
gs_df.printSchema()

#print the median of the limit_balance from age using percentile function 
gs_df.registerTempTable("df")
mdf = spark.sql("select age, percentile_approx(limit_balance, 0.5) as mlimit_balance from df group by age order by age")
mdf.show()

#print the median of the limit_balance from age using sum(limit_balance(age)/count(age))
gs_df.registerTempTable("df_2")
mdf_2 = spark.sql("select age, count(age) as cnt, (sum(limit_balance)/count(age)) as ave_limit_balance from df_2 group by age order by age")
mdf_2.show()

# Test that sum and count is working for 21 age
gs_df.registerTempTable("df_3")
mdf_3 = spark.sql("select age, limit_balance from df_3 where age = '21.0'")
mdf_3.show()

In [None]:
pdf = mdf_2.toPandas()
pdf

In [None]:
pdf.plot("age", "mlimit_balance")

In [21]:
client.delete_dataset(
    dataset_id, delete_contents=True, not_found_ok=True
)  # Make an API request.

print("Deleted dataset '{}'.".format(dataset_id))

Deleted dataset 'peaceful-parity-336514.GCP_lab_credit_card_dataset'.
