In [1]:
import findspark
findspark.init("/opt/spark")

In [2]:
import pyspark
pyspark.__file__

'/opt/spark/python/pyspark/__init__.py'

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Blackjack") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/12 20:50:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/12 20:51:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/12/12 20:51:00 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/12/12 20:51:00 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [4]:
df = spark.read.csv(
    "s3a://blackjack-900-ujjwal/raw/blkjckhands.csv",
    header=True,
    inferSchema=True
)

25/12/12 20:51:05 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [6]:
# cleaning the data
cleaned_df = df.drop("_c0")

In [7]:
# print schema and first 5 rows
cleaned_df.show(5, truncate=False)
cleaned_df.printSchema()

+--------+-----+-----+-----+-----+-----+----------+---------+---------+---------+---------+---------+---------+------+-------+-----------+----------+--------+--------+-----------+
|PlayerNo|card1|card2|card3|card4|card5|sumofcards|dealcard1|dealcard2|dealcard3|dealcard4|dealcard5|sumofdeal|blkjck|winloss|plybustbeat|dlbustbeat|plwinamt|dlwinamt|ply2cardsum|
+--------+-----+-----+-----+-----+-----+----------+---------+---------+---------+---------+---------+---------+------+-------+-----------+----------+--------+--------+-----------+
|Player1 |7    |10   |0    |0    |0    |17        |10       |8        |0        |0        |0        |18       |nowin |Loss   |Beat       |Dlwin     |0       |10      |17         |
|Player2 |10   |9    |0    |0    |0    |19        |10       |8        |0        |0        |0        |18       |nowin |Win    |Plwin      |Beat      |20      |0       |19         |
|Player3 |9    |8    |0    |0    |0    |17        |10       |8        |0        |0        |0        

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when,expr

In [9]:
#creating column playerTotalCards to check the total cards given to the player
cleaned_df = cleaned_df.withColumn(
    "playerTotalCards",
    expr("int(card1 > 0) + int(card2 > 0) + int(card3 > 0) + int(card4 > 0) + int(card5 > 0)")
)

In [10]:
#creating column dealerTotalCards to check the total cards drawn by the dealer
cleaned_df = cleaned_df.withColumn(
    "dealerTotalCards",
    expr("int(dealcard1 > 0) + int(dealcard2 > 0) + int(dealcard3 > 0) + int(dealcard4 > 0) + int(dealcard5 > 0)")
)


In [11]:
cleaned_df.select('playerTotalCards').show(5)

+----------------+
|playerTotalCards|
+----------------+
|               2|
|               2|
|               2|
|               3|
|               3|
+----------------+
only showing top 5 rows



In [12]:
cleaned_df.select('dealerTotalCards').show(5)

+----------------+
|dealerTotalCards|
+----------------+
|               2|
|               2|
|               2|
|               2|
|               2|
+----------------+
only showing top 5 rows



In [13]:
cleaned_df.show(5)
cleaned_df.printSchema()

+--------+-----+-----+-----+-----+-----+----------+---------+---------+---------+---------+---------+---------+------+-------+-----------+----------+--------+--------+-----------+----------------+----------------+
|PlayerNo|card1|card2|card3|card4|card5|sumofcards|dealcard1|dealcard2|dealcard3|dealcard4|dealcard5|sumofdeal|blkjck|winloss|plybustbeat|dlbustbeat|plwinamt|dlwinamt|ply2cardsum|playerTotalCards|dealerTotalCards|
+--------+-----+-----+-----+-----+-----+----------+---------+---------+---------+---------+---------+---------+------+-------+-----------+----------+--------+--------+-----------+----------------+----------------+
| Player1|    7|   10|    0|    0|    0|        17|       10|        8|        0|        0|        0|       18| nowin|   Loss|       Beat|     Dlwin|       0|      10|         17|               2|               2|
| Player2|   10|    9|    0|    0|    0|        19|       10|        8|        0|        0|        0|       18| nowin|    Win|      Plwin|      

In [14]:
# Average Player vs Dealer Final Card Sums
cleaned_df.agg(
    F.avg("sumofcards").alias("avg_player_sum"),
    F.avg("sumofdeal").alias("avg_dealer_sum")
).show()



+-----------------+------------------+
|   avg_player_sum|    avg_dealer_sum|
+-----------------+------------------+
|18.89752222222222|20.298706666666668|
+-----------------+------------------+



                                                                                

In [16]:
# Top 10 Players by Total Winnings
top_players = cleaned_df.groupBy("PlayerNo") \
    .agg(F.sum("plwinamt").alias("total_player_earnings")) \
    .orderBy(F.desc("total_player_earnings")) \
    .limit(10)

top_players.show(truncate=False)



+--------+---------------------+
|PlayerNo|total_player_earnings|
+--------+---------------------+
|Player6 |1462085              |
|Player1 |1460415              |
|Player4 |1458430              |
|Player2 |1458320              |
|Player5 |1456025              |
|Player3 |1455900              |
+--------+---------------------+



                                                                                

In [17]:
#Player Win Rate
win_rate = cleaned_df.agg(
    F.avg(
        F.when(F.col("winloss") == "Win", 1).otherwise(0)
    ).alias("player_win_rate")
)

win_rate.show()


[Stage 13:>                                                         (0 + 2) / 2]

+-------------------+
|    player_win_rate|
+-------------------+
|0.42882888888888887|
+-------------------+



                                                                                

In [18]:
#Player Win Rate
dealer_win_rate = cleaned_df.agg(
    F.avg(
        F.when(F.col("winloss") == "Loss", 1).otherwise(0)
    ).alias("dealer_win_rate")
)

dealer_win_rate.show()

[Stage 16:>                                                         (0 + 2) / 2]

+------------------+
|   dealer_win_rate|
+------------------+
|0.4775788888888889|
+------------------+



                                                                                

In [19]:
#Blackjack Frequency
blackjack_freq = cleaned_df.agg(
    F.sum(F.when(F.col("ply2cardsum") == 21, 1).otherwise(0)).alias("blackjack_count"),
    F.avg(F.when(F.col("ply2cardsum") == 21, 1).otherwise(0)).alias("blackjack_rate")
)

blackjack_freq.show()

[Stage 19:>                                                         (0 + 2) / 2]

+---------------+-------------------+
|blackjack_count|     blackjack_rate|
+---------------+-------------------+
|          43061|0.04784555555555556|
+---------------+-------------------+



                                                                                

In [20]:
#Player Bust Rate
player_bust_rate = cleaned_df.agg(
    F.avg(
        F.when(F.col("sumofcards") > 21, 1).otherwise(0)
    ).alias("player_bust_rate")
)
player_bust_rate.show()


[Stage 22:>                                                         (0 + 2) / 2]

+----------------+
|player_bust_rate|
+----------------+
|          0.1787|
+----------------+



                                                                                

In [21]:
#Dealer Bust Rate
dealer_bust_rate = cleaned_df.agg(
    F.avg(
        F.when(F.col("sumofdeal") > 21, 1).otherwise(0)
    ).alias("dealer_bust_rate")
)

dealer_bust_rate.show()

[Stage 25:>                                                         (0 + 2) / 2]

+-------------------+
|   dealer_bust_rate|
+-------------------+
|0.28113333333333335|
+-------------------+



                                                                                

In [22]:
cleaned_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("s3a://blackjack-900-ujjwal/processed/cleaned_blackjack_csv")

25/12/12 20:53:18 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/12/12 20:53:19 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/12/12 20:53:19 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [23]:
cleaned_df.createOrReplaceTempView("blackjack")

In [24]:
spark.sql("""
    SELECT 
        sumofcards AS final_total,
        COUNT(*) AS frequency
    FROM blackjack
    GROUP BY sumofcards
    ORDER BY frequency DESC
    LIMIT 1
""").show()



+-----------+---------+
|final_total|frequency|
+-----------+---------+
|         20|   146800|
+-----------+---------+



                                                                                

In [25]:
spark.sql("""
    SELECT 
        winloss,
        AVG(dealerTotalCards) AS avg_cards_drawn
    FROM blackjack
    GROUP BY winloss
""").show()



+-------+------------------+
|winloss|   avg_cards_drawn|
+-------+------------------+
|    Win| 3.104846273831054|
|   Loss| 2.767821953790066|
|   Push|2.5955029501501787|
+-------+------------------+



                                                                                

In [26]:
spark.sql("""
    SELECT 
        playerTotalCards,
        AVG(CASE WHEN winloss = 'Win' THEN 1 ELSE 0 END) AS win_rate
    FROM blackjack
    GROUP BY playerTotalCards
    ORDER BY playerTotalCards
""").show()



+----------------+------------------+
|playerTotalCards|          win_rate|
+----------------+------------------+
|               2|0.5266137068855588|
|               3|0.3692415043097333|
|               4|0.2728524520707552|
|               5|0.2520575555679973|
+----------------+------------------+



                                                                                

In [27]:
spark.sql("""
    SELECT 
        blkjck AS win_type,
        AVG(plwinamt) AS avg_win_amount
    FROM blackjack
    WHERE plwinamt > 0
    GROUP BY blkjck
    ORDER BY avg_win_amount DESC
""").show()

[Stage 38:>                                                         (0 + 2) / 2]

+--------+-----------------+
|win_type|   avg_win_amount|
+--------+-----------------+
|     Win|23.23181068716472|
|   nowin| 18.1467182371148|
+--------+-----------------+



                                                                                

In [28]:
spark.sql("""
    SELECT 
        ply2cardsum,
        COUNT(*) AS count
    FROM blackjack
    GROUP BY ply2cardsum
    ORDER BY ply2cardsum
""").show()



+-----------+-----+
|ply2cardsum|count|
+-----------+-----+
|          2| 1599|
|          3| 4051|
|          4|10177|
|          5|16647|
|          6|22319|
|          7|28740|
|          8|26470|
|          9|31937|
|         10|37374|
|         11|43022|
|         12|83944|
|         13|80827|
|         14|75133|
|         15|68970|
|         16|62396|
|         17|56456|
|         18|58619|
|         19|53158|
|         20|95100|
|         21|43061|
+-----------+-----+



                                                                                