# Ex3 - Getting and Knowing your Data

Check out [Occupation Exercises Video Tutorial](https://www.youtube.com/watch?v=W8AB5s-L3Rw&list=PLgJhDSE2ZLxaY_DigHeiIDC1cD09rXgJv&index=4) to watch a data scientist go through the exercises

This time we are going to pull data directly from the internet.
Special thanks to: https://github.com/justmarkham for sharing the dataset and materials.

### Step 1. Import the necessary libraries

In [1]:
from functools import reduce

import pyspark.sql.functions as F
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql import types as T

spark = SparkSession.Builder().getOrCreate()
spark

your 131072x1 screen size is bogus. expect trouble
25/03/15 22:17:08 WARN Utils: Your hostname, Mark resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/15 22:17:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/15 22:17:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/justmarkham/DAT8/master/data/u.user). 

In [2]:
data_path = "https://raw.githubusercontent.com/justmarkham/DAT8/master/data/u.user"
spark.sparkContext.addFile(data_path)

### Step 3. Assign it to a variable called users and use the 'user_id' as index

In [3]:
# Spark doesn't have an index.
users = spark.read.csv(f"file:///{SparkFiles.get('u.user')}", sep="|", header=True)
users.show(5)
users.printSchema()

                                                                                

+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|      1| 24|     M|technician|   85711|
|      2| 53|     F|     other|   94043|
|      3| 23|     M|    writer|   32067|
|      4| 24|     M|technician|   43537|
|      5| 33|     F|     other|   15213|
+-------+---+------+----------+--------+
only showing top 5 rows

root
 |-- user_id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [4]:
users = users.withColumns(
    {
        "user_id": F.col("user_id").cast(T.IntegerType()),
        "age": F.col("age").cast(T.IntegerType()),
        # "gender": F.col("gender").cast(T.StringType()),
        # "occupation": F.col("occupation").cast(T.StringType()),
        # "zip_code": F.col("zip_code").cast(T.StringType()),
    }
)
users.show(3)
users.printSchema()

+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|      1| 24|     M|technician|   85711|
|      2| 53|     F|     other|   94043|
|      3| 23|     M|    writer|   32067|
+-------+---+------+----------+--------+
only showing top 3 rows

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



### Step 4. See the first 25 entries

In [5]:
users.show(25)

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|      1| 24|     M|   technician|   85711|
|      2| 53|     F|        other|   94043|
|      3| 23|     M|       writer|   32067|
|      4| 24|     M|   technician|   43537|
|      5| 33|     F|        other|   15213|
|      6| 42|     M|    executive|   98101|
|      7| 57|     M|administrator|   91344|
|      8| 36|     M|administrator|   05201|
|      9| 29|     M|      student|   01002|
|     10| 53|     M|       lawyer|   90703|
|     11| 39|     F|        other|   30329|
|     12| 28|     F|        other|   06405|
|     13| 47|     M|     educator|   29206|
|     14| 45|     M|    scientist|   55106|
|     15| 49|     F|     educator|   97301|
|     16| 21|     M|entertainment|   10309|
|     17| 30|     M|   programmer|   06355|
|     18| 35|     F|        other|   37212|
|     19| 40|     M|    librarian|   02138|
|     20| 42|     F|    homemake

### Step 5. See the last 10 entries

In [6]:
users.orderBy(F.desc("user_id")).show(10)

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|    943| 22|     M|      student|   77841|
|    942| 48|     F|    librarian|   78209|
|    941| 20|     M|      student|   97229|
|    940| 32|     M|administrator|   02215|
|    939| 26|     F|      student|   33319|
|    938| 38|     F|   technician|   55038|
|    937| 48|     M|     educator|   98072|
|    936| 24|     M|        other|   32789|
|    935| 42|     M|       doctor|   66221|
|    934| 61|     M|     engineer|   22902|
+-------+---+------+-------------+--------+
only showing top 10 rows



### Step 6. What is the number of observations in the dataset?

In [7]:
print(f"{users.count() = }")

(
    users.groupBy("user_id")
    .agg(F.count("user_id").alias("count"))
    .where(F.col("count") != 1)
).show()

25/03/15 22:17:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


users.count() = 943
+-------+-----+
|user_id|count|
+-------+-----+
+-------+-----+



### Step 7. What is the number of columns in the dataset?

In [8]:
len(users.columns)

5

### Step 8. Print the name of all the columns.

In [9]:
users.columns

['user_id', 'age', 'gender', 'occupation', 'zip_code']

### Step 9. How is the dataset indexed?

In [10]:
# "the index" (aka "the labels")
# Spark doesn't have an index

### Step 10. What is the data type of each column?

In [11]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



### Step 11. Print only the occupation column

In [12]:
users.select("user_id", "occupation").show()

+-------+-------------+
|user_id|   occupation|
+-------+-------------+
|      1|   technician|
|      2|        other|
|      3|       writer|
|      4|   technician|
|      5|        other|
|      6|    executive|
|      7|administrator|
|      8|administrator|
|      9|      student|
|     10|       lawyer|
|     11|        other|
|     12|        other|
|     13|     educator|
|     14|    scientist|
|     15|     educator|
|     16|entertainment|
|     17|   programmer|
|     18|        other|
|     19|    librarian|
|     20|    homemaker|
+-------+-------------+
only showing top 20 rows



### Step 12. How many different occupations are in this dataset?

In [13]:
users.select(F.count_distinct("occupation").alias("occupation_count")).show()

+----------------+
|occupation_count|
+----------------+
|              21|
+----------------+



### Step 13. What is the most frequent occupation?

In [14]:
(
    users.groupBy("occupation")
    .agg(F.count("occupation").alias("count"))
    .orderBy(F.desc("count"))
    .limit(1)
).show()

+----------+-----+
|occupation|count|
+----------+-----+
|   student|  196|
+----------+-----+



In [15]:
(
    users.groupBy("occupation")
    .agg(F.count("occupation").alias("count"))
    .orderBy(F.desc("count"))
    .limit(1)
    .collect()[0]["count"]
)

196

### Step 14. Summarize the DataFrame.

In [16]:
summary = users.select(
    *[
        op("age").alias(name)
        for op, name in [
            (F.count, "count"),
            (F.avg, "avg"),
            (F.std, "std"),
            (F.min, "min"),
            (lambda col: F.percentile(col, 0.25), "25%"),
            (lambda col: F.percentile(col, 0.5), "50%"),
            (lambda col: F.percentile(col, 0.75), "75%"),
            (F.max, "max"),
        ]
    ]
)
summary.show()
summary.collect()[0].asDict()

+-----+-----------------+-----------------+---+----+----+----+---+
|count|              avg|              std|min| 25%| 50%| 75%|max|
+-----+-----------------+-----------------+---+----+----+----+---+
|  943|34.05196182396607|12.19273973305903|  7|25.0|31.0|43.0| 73|
+-----+-----------------+-----------------+---+----+----+----+---+



{'count': 943,
 'avg': 34.05196182396607,
 'std': 12.19273973305903,
 'min': 7,
 '25%': 25.0,
 '50%': 31.0,
 '75%': 43.0,
 'max': 73}

In [17]:
users.select("age").summary().show()

+-------+-----------------+
|summary|              age|
+-------+-----------------+
|  count|              943|
|   mean|34.05196182396607|
| stddev|12.19273973305903|
|    min|                7|
|    25%|               25|
|    50%|               31|
|    75%|               43|
|    max|               73|
+-------+-----------------+



### Step 15. Summarize all the columns

In [18]:
dfs = [
    (
        users.select(
            F.count(col).alias("count"),
            F.count_distinct(col).alias("unique"),
            F.mode(col).alias("top"),
            F.count_if(
                F.col(col)
                == (
                    users.groupBy(col)
                    .agg(F.count(col).alias("count"))
                    .orderBy(F.desc("count"))
                    .collect()[0][col]
                )
            ).alias("freq"),
            F.avg(col).alias("avg"),
            F.std(col).alias("std"),
            F.min(col).alias("min"),
            F.percentile(col, 0.25).alias("25%"),
            F.percentile(col, 0.5).alias("50%"),
            F.percentile(col, 0.75).alias("75%"),
            F.max(col).alias("max"),
        ).withColumn("field", F.lit(col))
    )
    for col in ["age", "gender", "occupation", "zip_code"]
]

reduce(lambda a, b: a.union(b), dfs).select(
    "field", *[col for col in dfs[0].columns if col != "field"]
).show()

+----------+-----+------+-------+----+-----------------+------------------+-------------+-------+-------+-------+------+
|     field|count|unique|    top|freq|              avg|               std|          min|    25%|    50%|    75%|   max|
+----------+-----+------+-------+----+-----------------+------------------+-------------+-------+-------+-------+------+
|       age|  943|    61|     30|  39|34.05196182396607|12.192739733059032|            7|   25.0|   31.0|   43.0|    73|
|    gender|  943|     2|      M| 670|             NULL|              NULL|            F|   NULL|   NULL|   NULL|     M|
|occupation|  943|    21|student| 196|             NULL|              NULL|administrator|   NULL|   NULL|   NULL|writer|
|  zip_code|  943|   795|  55414|   9|50868.78810810811|30891.373254138154|        00000|21227.0|53711.0|78741.0| Y1A6B|
+----------+-----+------+-------+----+-----------------+------------------+-------------+-------+-------+-------+------+



### Step 16. Summarize only the occupation column

In [19]:
(
    users.select(
        F.count("occupation").alias("count"),
        F.countDistinct("occupation").alias("unique"),
        F.mode("occupation").alias("top"),
        F.count_if(
            F.col("occupation")
            == (
                users.groupBy("occupation")
                .agg(F.count("occupation").alias("count"))
                .orderBy(F.desc("count"))
                .limit(1)
                .collect()[0]["occupation"]
            )
        ).alias("freq"),
    )
).show()

+-----+------+-------+----+
|count|unique|    top|freq|
+-----+------+-------+----+
|  943|    21|student| 196|
+-----+------+-------+----+



### Step 17. What is the mean age of users?

In [20]:
users.select(F.avg("age")).show()

+-----------------+
|         avg(age)|
+-----------------+
|34.05196182396607|
+-----------------+



### Step 18. What is the age with least occurrence?

In [21]:
grouped_age = (
    users.groupBy("age").agg(F.count("age").alias("count")).orderBy(F.asc("count"))
)
grouped_age.show(6)

lowest_count = grouped_age.select(F.min("count").alias("min")).collect()[0]["min"]
print(lowest_count)

(
    users.groupBy("age")
    .agg(F.count("age").alias("count"))
    .where(F.col("count") == lowest_count)
    .orderBy(F.asc("age"))
).show()

+---+-----+
|age|count|
+---+-----+
|  7|    1|
| 10|    1|
| 73|    1|
| 11|    1|
| 66|    1|
| 64|    2|
+---+-----+
only showing top 6 rows

1
+---+-----+
|age|count|
+---+-----+
|  7|    1|
| 10|    1|
| 11|    1|
| 66|    1|
| 73|    1|
+---+-----+

