In [25]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import  types
from pyspark.sql import functions as F

In [2]:
# Stop existing SparkContext if it exists
if SparkContext._active_spark_context:
    SparkContext._active_spark_context.stop()

# Define GCS credentials and connector path
credentials_location = "/root/app/Indian-Accident-Analysis-Data-Pipeline/keys/creds.json"
gcs_connector_jar = "/root/gcloud/gcs-connector-hadoop3-latest.jar"
bigquery_connector_jar = "/root/gcloud/spark-bigquery-latest_2.12.jar"

# Create Spark configuration
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", f"{gcs_connector_jar},{bigquery_connector_jar}") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

# Create SparkContext
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

# Initialize SparkSession
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()


your 131072x1 screen size is bogus. expect trouble
25/03/16 07:38:14 WARN Utils: Your hostname, SRCIND-21BQ9G3 resolves to a loopback address: 127.0.1.1; using 172.26.144.30 instead (on interface eth0)
25/03/16 07:38:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/03/16 07:38:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [19]:
state_schema = types.StructType([
    types.StructField('year', types.IntegerType(), True), 
    types.StructField('States_UTs', types.StringType(), True), 
    types.StructField('Accidents', types.DoubleType(), True)
])

In [20]:
df_state = spark.read.csv(
    "gs://indian_road_accidents-sandbox-449108/state_wise_road_accidents/raw/2019/2019.csv", 
    header=True,
    schema = state_schema
)

In [22]:
df_state.printSchema()

root
 |-- year: integer (nullable = true)
 |-- States_UTs: string (nullable = true)
 |-- Accidents: double (nullable = true)



In [None]:
# Convert Accidents to Integer safely
df_state = df_state.withColumn("Accidents", F.col("Accidents").cast("int"))
df_state = df_state.fillna({"Accidents": 0}) 

# Show DataFrame before writing
df_state.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+----+-----------------+---------+
|year|       States_UTs|Accidents|
+----+-----------------+---------+
|2019|   Andhra Pradesh|    21992|
|2019|Arunachal Pradesh|      237|
|2019|            Assam|     8350|
|2019|            Bihar|    10007|
|2019|     Chhattisgarh|    13899|
+----+-----------------+---------+
only showing top 5 rows



                                                                                

25/03/16 08:49:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3012811 ms exceeds timeout 120000 ms
25/03/16 08:49:39 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/16 08:49:39 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [None]:
df_state.repartition(4).write \
    .mode("overwrite") \
    .parquet("gs://indian_road_accidents-sandbox-449108/state_wise_road_accidents/parquet/2018/")


                                                                                

In [None]:
deaths_schema = types.StructType([
    types.StructField('year', types.IntegerType(), True), 
    types.StructField('Type of Collision', types.StringType(), True), 
    types.StructField('Deaths', types.IntegerType(), True)
])

In [None]:
df_deaths = spark.read.csv(
    "gs://indian_road_accidents-sandbox-449108/deaths_by_road_accidents/raw/2018/2018.csv", 
    header=True, 
    schema=deaths_schema
)

In [None]:
df_deaths.printSchema()

root
 |-- year: integer (nullable = true)
 |-- Type of Collision: string (nullable = true)
 |-- Deaths: integer (nullable = true)



In [None]:
df_deaths.repartition(4).write \
    .mode("overwrite") \
    .parquet("gs://indian_road_accidents-sandbox-449108/deaths_by_road_accidents/parquet/2018/")


                                                                                

In [None]:
bucket = "dataproc-temp-us-central1-432775935527-es23nxzn"
spark.conf.set('temporaryGcsBucket', bucket)

In [None]:
df_deaths.write \
    .format('bigquery') \
    .option('table', 'kestra-sandbox-449108.indian_road_accidents.people_killed_by_accident_type') \
    .mode('append') \
    .save()


Py4JJavaError: An error occurred while calling o951.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: bigquery. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more
