In [None]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

spark = SparkSession.builder.config('spark',3000).getOrCreate()

In [None]:
path = 'data/sparkify_log_small.json'
logs = spark.read.json(path)

In [3]:
logs.take(1)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')]

## An incorrect column name will result in a long analysis exception error
## AnalysisException

    ---------------------------------------------------------------------------
    AnalysisException                         Traceback (most recent call last)
    Input In [4], in <cell line: 1>()
    ----> 1 logs.select(['UserId','firstName','page', 'son']).where(logs.UserId == '1046')

    AnalysisException: Column 'son' does not exist. Did you mean one of the following? [song, ts, auth, page, gender, length, level, method, status, userId, artist, location, sessionId, lastName, userAgent, firstName, registration, itemInSession];

In [4]:
# a column write wrong
logs.select(['UserId','firstName','page', 'son']).where(logs.UserId == '1046')

AnalysisException: Column 'son' does not exist. Did you mean one of the following? [song, ts, auth, page, gender, length, level, method, status, userId, artist, location, sessionId, lastName, userAgent, firstName, registration, itemInSession];
'Project [UserId#25, firstName#10, page#18, 'son]
+- Relation [artist#8,auth#9,firstName#10,gender#11,itemInSession#12L,lastName#13,length#14,level#15,location#16,method#17,page#18,registration#19L,sessionId#20L,song#21,status#22L,ts#23L,userAgent#24,userId#25] json


## A typo in a method name will generate a short attribute error
## AttributeError

    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    Input In [5], in <cell line: 2>()
          1 # a column write wrong
    ----> 2 logs.select(['UserId','firstName','page', 'song']).Where(logs.UserId == '1046')

    AttributeError: 'DataFrame' object has no attribute 'Where'


In [6]:
# Use only lowercase in method names. An underscore should separate words in a method name.
logs.select(['UserId','firstName','page', 'song']).Where(logs.UserId == '1046')

AttributeError: 'DataFrame' object has no attribute 'Where'

## Typos in variables can result in lengthy errors
## NameError

    ---------------------------------------------------------------------------
    NameError                                 Traceback (most recent call last)
    Input In [9], in <cell line: 2>()
          1 # Use only lowercase in method names. An underscore should separate words in a method name.
    ----> 2 logs.select(['UserId','firstName','page', 'song']).where(log.UserId == '1046')

    NameError: name 'log' is not defined

In [4]:
# Use only lowercase in method names. An underscore should separate words in a method name.
logs.select(['UserId','firstName','page', 'song']).where(log.UserId == '1046')

NameError: name 'log' is not defined

## While Spark supports the Python API, its native language is Scala. That's why some of the error messages are referring to Scala, Java, or JVM issues even when we are running Python code.

ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 5)
java.lang.OutOfMemoryError: Java heap space

In [10]:
logs2 = logs.withColumn('artist', logs.artist + 'x')

In [11]:
logs.crossJoin(logs).collect()

[Stage 3:>                                                          (0 + 2) / 2]

22/09/23 05:52:42 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 5)
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2727/875419026.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3369, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/var/folders/dc/4nb76k213rg53clfvtj522h00000gn/T/ipykernel_33050/820303775.py", line 1, in <cell line: 1>
    logs.crossJoin(logs).collect()
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJav

ConnectionRefusedError: [Errno 61] Connection refused

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 54] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/lucianogalvao/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [5]:
logs.crossJoin(logs).take(5)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Showaddywaddy', auth='Logged In', firstName='Kennet