## Install Libraries and Setup Driver Manager

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from pyspark.sql.functions import col

In [2]:
pyspark.__version__

'4.0.0'

In [3]:
spark = SparkSession.builder \
                    .appName('pipeline movie data') \
                    .getOrCreate()

In [4]:
spark

## Read Data

In [5]:
df_ratings = spark.read.option("Header", "True").csv('ratings.csv')

In [6]:
df_ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
|     1|   4878|   5.0|1425941434|
|     1|   5577|   5.0|1425941397|
|     1|  33794|   4.0|1425942005|
|     1|  54503|   3.5|1425941313|
|     1|  58559|   4.0|1425942007|
|     1|  59315|   5.0|1425941502|
|     1|  68358|   5.0|1425941464|
|     1|  69844|   5.0|1425942139|
|     1|  73017|   5.0|1425942699|
|     1|  81834|   5.0|1425942133|
+------+-------+------+----------+
only showing top 20 rows


In [7]:
df_metadata = spark.read.option("header", "true").csv('metadata/movies_metadata.csv')

In [8]:
df_metadata.show()

+-----+---------------------+---------+--------------------+--------------------+------+---------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+---------+--------------------+--------------------+--------------------+--------------------+------+------------+----------+
|adult|belongs_to_collection|   budget|              genres|            homepage|    id|  imdb_id|original_language|      original_title|            overview|          popularity|         poster_path|production_companies|production_countries|        release_date|   revenue|  runtime|    spoken_languages|              status|             tagline|               title| video|vote_average|vote_count|
+-----+---------------------+---------+--------------------+--------------------+------+---------+-----------------+--------------------+--------------------+--------------------+--------------------+

In [9]:
df_metadata.show(truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------+------+---------+-----------------+--------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
df_ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [11]:
df_metadata.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

## Data Preprocessing

In [12]:
joined_df = df_ratings.join(df_metadata, df_ratings.movieId == df_metadata.id, how='inner')

In [13]:
joined_df.show(1)

+------+-------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+-----+------------+----------+
|userId|movieId|rating| timestamp|adult|belongs_to_collection|budget|              genres|homepage|    id|  imdb_id|original_language|original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|video|vote_average|vote_count|
+------+-------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---

In [14]:
rename_cols = {
    'userId': 'user_id',
    'movieId': 'movie_id'
}

In [15]:
joined_df = joined_df.withColumnsRenamed(colsMap=rename_cols)

In [16]:
joined_df.show(1)

+-------+--------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+-----+------------+----------+
|user_id|movie_id|rating| timestamp|adult|belongs_to_collection|budget|              genres|homepage|    id|  imdb_id|original_language|original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|video|vote_average|vote_count|
+-------+--------+------+----------+-----+---------------------+------+--------------------+--------+------+---------+-----------------+--------------+--------------------+----------+--------------------+--------------------+--------------------+----------

In [17]:
joined_df.columns

['user_id',
 'movie_id',
 'rating',
 'timestamp',
 'adult',
 'belongs_to_collection',
 'budget',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

In [18]:
selected_columns = [
    'user_id',
    'movie_id',
    'rating',
    'timestamp',
    'adult',
    'budget',
    'genres',
    'original_language',
    'original_title',
    'overview',
    'popularity',
    'production_companies',
    'release_date',
    'revenue',
    'runtime',
    'spoken_languages',
    'status',
    'tagline',
    'title',
    'vote_average',
    'vote_count'
 ]

In [19]:
joined_df = joined_df.select(selected_columns)

In [20]:
joined_df.show(1)

+-------+--------+------+----------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|user_id|movie_id|rating| timestamp|adult|budget|              genres|original_language|original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|vote_average|vote_count|
+-------+--------+------+----------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|    429|  100010|   1.0|1475231879|False|     0|[{'id': 18, 'name...|               en|Flight Command|A rookie flyer, E...|  0.769266|[{'name': 'Metro-...|  1940-12-27|   

In [21]:
joined_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- movie_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)



In [22]:
casting_cols = {
    "user_id": "int",
    "movie_id": "int",
    "budget": "float",
    "popularity": "float",
    "revenue": "float",
    "runtime": "float",
    "vote_average": "float",
    "vote_count": "float"
}

In [23]:
for col_name, col_type in casting_cols.items():
    joined_df = joined_df.withColumn(col_name, joined_df[col_name].cast(col_type))

In [24]:
# SAMA SEPERTI CELL DI ATAS
# for col_name, col_type in casting_cols.items():
#     joined_df = joined_df.withColumn(col_name, joined_df[col_name].cast(col_type))

In [25]:
joined_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- budget: float (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: float (nullable = true)
 |-- runtime: float (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: float (nullable = true)



In [26]:
from pyspark.sql.functions import from_unixtime

joined_df.show(1)

+-------+--------+------+----------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|user_id|movie_id|rating| timestamp|adult|budget|              genres|original_language|original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|vote_average|vote_count|
+-------+--------+------+----------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|    429|  100010|   1.0|1475231879|False|   0.0|[{'id': 18, 'name...|               en|Flight Command|A rookie flyer, E...|  0.769266|[{'name': 'Metro-...|  1940-12-27|   

In [27]:
# convert unix time to timestamp

joined_df = joined_df.withColumn('timestamp', from_unixtime('timestamp').alias('ts'))

In [28]:
joined_df.show(1)

+-------+--------+------+-------------------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|user_id|movie_id|rating|          timestamp|adult|budget|              genres|original_language|original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|             tagline|         title|vote_average|vote_count|
+-------+--------+------+-------------------+-----+------+--------------------+-----------------+--------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------+------------+----------+
|    429|  100010|   1.0|2016-09-30 17:37:59|False|   0.0|[{'id': 18, 'name...|               en|Flight Command|A rookie flyer, E...|  0.769266|[

In [29]:
joined_df.count()

10880835

In [30]:
# temp = joined_df.select('*')

In [31]:
# filter data: release_date >= 2010-01-01 and timestamp >= 2017-01-01 00:00:00

joined_df = joined_df.filter("release_date >= '2010-01-01' AND timestamp >= '2017-01-01 00:00:00'")
joined_df.show(1)

+-------+--------+------+-------------------+-----+--------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|user_id|movie_id|rating|          timestamp|adult|  budget|              genres|original_language|      original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|
+-------+--------+------+-------------------+-----+--------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|    837|  120478|   5.0|2017-08-03 20:05:49|False|115000.0|[{'id': 27, 'name...|               en|The 

In [32]:
joined_df.count()

47700

In [33]:
joined_df = joined_df.withColumn('profit', joined_df['revenue']-joined_df['budget'])

In [34]:
joined_df.show(1)

+-------+--------+------+-------------------+-----+--------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------+
|user_id|movie_id|rating|          timestamp|adult|  budget|              genres|original_language|      original_title|            overview|popularity|production_companies|release_date|revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|   profit|
+-------+--------+------+-------------------+-----+--------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+------------+-------+-------+--------------------+--------+--------------------+--------------------+------------+----------+---------+
|    837|  120478|   5.0|2017-08-03 20:05:49|False|115000.0|[{'id': 27, '

In [35]:
joined_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- budget: float (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: float (nullable = true)
 |-- runtime: float (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: float (nullable = true)
 |-- profit: float (nullable = true)



## Load to Postgres

In [110]:
DB_USER = 'postgres'
DB_PASS = 'postgres'

url = f"jdbc:postgresql://movies_db:5439/moviedb"

properties = {
    'user': DB_USER,
    'password': DB_PASS
}

joined_df.write.jdbc(url=url, table='rating_movies', mode='overwrite', properties=properties)

Py4JJavaError: An error occurred while calling o708.jdbc.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:300)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:118)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:118)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:272)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:276)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:55)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:126)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:334)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:300)
		at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:118)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:118)
		at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:272)
		at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:276)
		at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
		at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:55)
		at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
		at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)
		at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:88)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
		at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
		at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
		at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 20 more
