In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import SparkSession
# define spark configuration object
spark = SparkSession.builder\
    .appName("HDFS -> GCS & Hive -> GCS Read/Write Usecase 1 & 2") \
    .config("spark.jars", "dbfs:/FileStore/jar/gcs_connector_hadoop2_2_2_7.jar") \
    .enableHiveSupport()\
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

#conf = spark.sparkContext._jsc.hadoopConfiguration()
#conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
#conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
#conf.set("google.cloud.auth.service.account.json.keyfile","/home/hduser/weekday-32-proj-427802-a6-fd822f730117.json")


#Need small fixes to make below work
#Databricks configs
#spark.sparkContext._conf.set("spark.hadoop.fs.gs.auth.service.account.private.key.id","xxx")
#spark.sparkContext._conf.set("spark.hadoop.fs.gs.auth.service.account.private.key","-----BEGIN PRIVATE KEY-----\nxxx\n-----END PRIVATE KEY-----\n")
#spark.sparkContext._conf.set("spark.hadoop.google.cloud.auth.service.account.enable","true")
#spark.sparkContext._conf.set("spark.hadoop.fs.gs.project.id","xxx")
#spark.sparkContext._conf.set("spark.hadoop.fs.gs.auth.service.account.email","xxx")

conf = spark.sparkContext.getConf().getAll()
#print(conf)

print("Usecase #1 - Data Transfer between HDFS/DBFS to GCS and Vice versa")

print("(a) Read from HDFS/DBFS and write to GCS.")
#HDFS
#hdfs_df=spark.read.csv("hdfs://localhost:54310/user/hduser/datatotransfer/")
#DBFS
hdfs_df = spark.read.csv("dbfs:/FileStore/datatotransfer/")
print("HDFS/DBFS Read Completed Successfully")

hdfscnt=hdfs_df.count()
print("hdfscnt: ",hdfscnt)
curts = spark.createDataFrame([1], IntegerType()).withColumn("curts", current_timestamp()).select(date_format(col("curts"), "yyyyMMddHHmmSS")).first()[0]
print(curts)

#Irfan's bucket
#hdfs_df.coalesce(1).write.csv("gs://inceptez-data-store/sumalatha_custdata_"+curts)
#Suma's bucket
hdfs_df.coalesce(1).write.csv("gs://wd32-sumalatha_10/sumalatha1_custdata_"+curts)
print("GCS Write Completed Successfully")


print("(b) Read from GCS and display count in Databricks.")
#gcs_df = spark.read.option("header", "false").option("delimiter", ",").csv("gs://inceptez-data-store/test_custdata_"+curts).toDF("custid","fname","lname","age","profession")
gcs_df = spark.read.option("header", "false").option("delimiter", ",").csv("gs://wd32-sumalatha_10/sumalatha1_custdata_"+curts).toDF("custid","fname","lname","age","profession")
gcs_df.cache()
gcs_df.show(2)
gcscnt=gcs_df.count()
#Reconcilation
print("Reconciliation between DBFS and GCS count....")
if (hdfscnt==gcscnt):
    print("GCS Write Completed Successfully including Data Quality/Reconcilation check completed (equivalent to sqoop --validate)")
else:
    print("Count is not matching - Possibly GCS Write Issue")
    exit(1)

print("Usecase #2 - Data Transfer between Hive/Databricks database tables to GCS and Vice versa")
print("(a) Writing data from GCS to Hive table")
gcs_df.write.mode("overwrite").saveAsTable("default.custs")
print("GCS to Hive Write Completed Successfully")


print("(b) Reading data from hive table")
df_hive=spark.read.table("default.studentrecords")
df_hive.write.json("gs://wd32-sumalatha_10/students_json_"+curts)
print("Hive to GCS Write Completed Successfully")


    

Usecase #1 - Data Transfer between HDFS/DBFS to GCS and Vice versa
(a) Read from HDFS/DBFS and write to GCS.
HDFS/DBFS Read Completed Successfully
hdfscnt:  13
20240717172073
GCS Write Completed Successfully
(b) Read from GCS and display count in Databricks.
+-------+-------+---------+---+--------------------+
| custid|  fname|    lname|age|          profession|
+-------+-------+---------+---+--------------------+
|4009987|   Todd|      Fox| 29|          Politician|
|4009988|Kathryn|McPherson| 28|Human resources a...|
+-------+-------+---------+---+--------------------+
only showing top 2 rows

Reconciliation between DBFS and GCS count....
GCS Write Completed Successfully including Data Quality/Reconcilation check completed (equivalent to sqoop --validate)
Usecase #2 - Data Transfer between Hive/Databricks database tables to GCS and Vice versa
(a) Writing data from GCS to Hive table
GCS to Hive Write Completed Successfully
(b) Reading data from hive table
Hive to GCS Write Completed Su