# Import and create Spark session

In [1]:
import hashlib

from pyspark import SparkConf
from pyspark.sql import Row, SparkSession, Window
from pyspark.sql.functions import (avg, broadcast, coalesce, col, concat,
                                   count, countDistinct, current_timestamp,
                                   date_format, dense_rank, lit, lower, md5,
                                   rand, round, sum, to_date, udf, upper, when)
from pyspark.sql.types import (DateType, DecimalType, DoubleType, IntegerType,
                               LongType, ShortType, StringType, StructField,
                               StructType, TimestampType)

In [2]:
conf = SparkConf()
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", False)
conf.set("spark.sql.broadcastTimeout", "2400")
# conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# conf.set("spark.executor.memory", "8g")

<pyspark.conf.SparkConf at 0x108514bd0>

In [3]:
spark = (
    SparkSession.builder.config(conf=conf)
    .appName("spark-jobs")
    .enableHiveSupport()
    .getOrCreate()
)

24/08/31 10:21:11 WARN Utils: Your hostname, MayM1.local resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/08/31 10:21:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/31 10:21:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
log4j = spark._jvm.org.apache.log4j
logger = log4j.LogManager.getLogger(__name__)

In [5]:
spark.version

'3.5.2'

# Reading sample data

In [5]:
products_df = spark.read.parquet("./data/products_parquet")
# sales_df = spark.read.parquet("./data/sales_parquet")
# sellers_df = spark.read.parquet("./data/sellers_parquet")

                                                                                

# Discover the data

In [6]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)



In [7]:
products_df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
+----------+------------+-----+
only showing top 5 rows



                                                                                

In [8]:
products_df.count()

                                                                                

75000000

In [9]:
sellers_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [10]:
sellers_df.show(5)

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      257237|
|        2|   seller_2|      754188|
|        3|   seller_3|      310462|
|        4|   seller_4|     1532808|
+---------+-----------+------------+
only showing top 5 rows



In [11]:
sellers_df.count()

10

In [6]:
sales_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [7]:
sales_df.show(5)

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
+--------+----------+---------+----------+---------------+--------------------+
only showing top 5 rows



[Stage 2:>                                                          (0 + 1) / 1]                                                                                

In [11]:
sales_df.count()

ConnectionRefusedError: [Errno 61] Connection refused

In [9]:
sales_df.groupBy(col("seller_id")) \
.agg(count("*").alias("cnt")) \
.orderBy(col("cnt").desc()).show()



+---------+--------+
|seller_id|     cnt|
+---------+--------+
|        0|19000000|
|        9|  111392|
|        3|  111328|
|        6|  111318|
|        2|  111233|
|        4|  111168|
|        7|  111040|
|        8|  110882|
|        5|  110874|
|        1|  110805|
+---------+--------+



                                                                                

# Warm up exercies

##   Output how many products have been actually sold at least once

In [15]:
print("Number of products sold at least once")
sales_df.agg(countDistinct(col("product_id"))).show()
sales_df.select(countDistinct("product_id")).show()

Number of products sold at least once


                                                                                

+-----------------+
|count(product_id)|
+-----------------+
|           993429|
+-----------------+

+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                    993429|
+--------------------------+





##  Output which is the product that has been sold in more orders

In [16]:
print("Product present in more orders")
a_df = sales_df.groupBy(col("product_id")) \
.agg(count("*").alias("cnt")) \
.where(col("cnt") > 1) \
.orderBy(col("cnt").desc()) \
.limit(3)

Product present in more orders


In [17]:
a_df.show()



+----------+--------+
|product_id|     cnt|
+----------+--------+
|         0|19000000|
|  31136332|       3|
|  32602520|       3|
+----------+--------+



                                                                                

In [18]:
a_df.collect()[0]

Row(product_id='0', cnt=19000000)

In [17]:
print("Number of products in more than 1 order")
sales_df.groupBy(col("product_id")).agg(count("*").alias("cnt")).where(
    col("cnt") > 1
).orderBy(col("cnt").desc()).count()

Number of products in more than 1 order


6582

## How many distinct products have been sold in each day?

In [18]:
sales_df.groupBy(col("date")).agg(
    countDistinct("product_id").alias("product_count")
).orderBy(col("date").desc()).show()



+----------+-------------+
|      date|product_count|
+----------+-------------+
|2020-07-10|        98973|
|2020-07-09|       100501|
|2020-07-08|        99662|
|2020-07-07|        99756|
|2020-07-06|       100765|
|2020-07-05|        99796|
|2020-07-04|        99791|
|2020-07-03|       100017|
|2020-07-02|        99807|
|2020-07-01|       100337|
+----------+-------------+



                                                                                

# Exercies

## 1. What is the average revenue of the orders?

In [19]:
sales_df.join(
    products_df, sales_df["product_id"] == products_df["product_id"], "inner"
).agg(avg(products_df["price"] * sales_df["num_pieces_sold"])).show()



+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+



                                                                                

### A skewed join, where one task takes a long time to execute since the join is skewed on a very small number of keys (in this case, product_id = 0)
Let’s fix this issue using a technique known as “key salting”. Using the new “salted” key will un-skew the join:
- Duplicate the entries that we have in the dimension table for the most common products, e.g. product_0 will be replicated creating the IDs: product_0–1, product_0–2, product_0–3 and so on.
- On the sales table, we are going to replace “product_0” with a random replica (e.g. some of them will be replaced with product_0–1, others with product_0–2, etc.).

#### Step 1 - Check and select the skewed keys 
In this case we are retrieving the top 100 keys: these will be the only salted keys.

In [13]:
results = (
    sales_df.groupby(sales_df["product_id"])
    .count()
    .sort(col("count").desc())
    .limit(5)
    .collect()
)
results

                                                                                

[Row(product_id='0', count=19000000)]

#### Step 2 - What we want to do is:
1. Duplicate the entries that we have in the dimension table for the most common products, e.g. product_0 will become: product_0-1, product_0-2, product_0-3 and so on
2. On the sales table, we are going to replace "product_0" with a random duplicate (e.g. some of them will be replaced with product_0-1, others with product_0-2, etc.)

In [7]:
REPLICATION_FACTOR = 101
l = []
replicated_products = []
for _r in results:
    replicated_products.append(_r["product_id"])
    for _rep in range(0, REPLICATION_FACTOR):
        l.append((_r["product_id"], _rep))
rdd = spark.sparkContext.parallelize(l)
replicated_df = rdd.map(lambda x: Row(product_id=x[0], replication=int(x[1])))
replicated_df = spark.createDataFrame(replicated_df)

In [8]:
replicated_df.show(5)

+----------+-----------+
|product_id|replication|
+----------+-----------+
|         0|          0|
|         0|          1|
|         0|          2|
|         0|          3|
|         0|          4|
+----------+-----------+
only showing top 5 rows



####   Step 3: Generate the salted key

In [9]:
products_df = products_df.join(
    broadcast(replicated_df),
    products_df["product_id"] == replicated_df["product_id"],
    "left",
).withColumn(
    "salted_join_key",
    when(replicated_df["replication"].isNull(), products_df["product_id"]).otherwise(
        concat(replicated_df["product_id"], lit("-"), replicated_df["replication"])
    ),
)

In [10]:
products_df.show(5)

+----------+------------+-----+----------+-----------+---------------+
|product_id|product_name|price|product_id|replication|salted_join_key|
+----------+------------+-----+----------+-----------+---------------+
|         0|   product_0|   22|         0|        100|          0-100|
|         0|   product_0|   22|         0|         99|           0-99|
|         0|   product_0|   22|         0|         98|           0-98|
|         0|   product_0|   22|         0|         97|           0-97|
|         0|   product_0|   22|         0|         96|           0-96|
+----------+------------+-----+----------+-----------+---------------+
only showing top 5 rows



In [11]:
sales_df = sales_df.withColumn(
    "salted_join_key",
    when(
        sales_df["product_id"].isin(replicated_products),
        concat(
            sales_df["product_id"],
            lit("-"),
            round(rand() * (REPLICATION_FACTOR - 1), 0).cast(IntegerType()),
        ),
    ).otherwise(sales_df["product_id"]),
)

In [12]:
sales_df.show(5)

+--------+----------+---------+----------+---------------+--------------------+---------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|salted_join_key|
+--------+----------+---------+----------+---------------+--------------------+---------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|           0-86|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|           0-74|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|           0-30|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|           0-22|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|           0-23|
+--------+----------+---------+----------+---------------+--------------------+---------------+
only showing top 5 rows



####  Step 4: Finally let's do the join

In [13]:
sales_df.join(
    products_df, sales_df["salted_join_key"] == products_df["salted_join_key"], "inner"
).agg(avg(products_df["price"] * sales_df["num_pieces_sold"])).show()



+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+





## 2. For each seller, what is the average % contribution of an order to the seller's daily quota?

### Using broadcast for sellers_df
- “Broadcasting” simply means that a copy of the table is sent to every executor, allowing to “localize” the task
- When we broadcast a table, we need to be sure that this will not become too-big-to-broadcast in the future, otherwise we’ll start to have Out Of Memory errors later in time

In [14]:
sales_df.join(
    broadcast(sellers_df), sales_df["seller_id"] == sellers_df["seller_id"], "inner"
).withColumn("ratio", sales_df["num_pieces_sold"] / sellers_df["daily_target"]).groupBy(
    sales_df["seller_id"]
).agg(
    avg("ratio")
).show()



+---------+--------------------+
|seller_id|          avg(ratio)|
+---------+--------------------+
|        0|2.019885898946922...|
|        7|2.595228787788170...|
|        3| 1.62888537056594E-4|
|        8|9.213030375408861E-5|
|        5|4.211073965904022E-5|
|        6|4.782147194369122E-5|
|        9|3.837913136180238E-5|
|        1|1.964233366461014...|
|        4|3.296428039825817E-5|
|        2|6.690408001060484E-5|
+---------+--------------------+





## 3. Who are the second most selling and the least selling persons (sellers) for each product? Who are those for product with `product_id = 0`

### 3.1. Calcuate the number of pieces sold by each seller for each product. Because in sales_df there are different date

In [35]:
total_sales_df = sales_df.groupby(col("product_id"), col("seller_id")).agg(
    sum("num_pieces_sold").alias("total_num_pieces_sold")
)
total_sales_df.show()



+----------+---------+---------------------+
|product_id|seller_id|total_num_pieces_sold|
+----------+---------+---------------------+
|  46681458|        9|                 33.0|
|  19609336|        5|                 56.0|
|  70025887|        4|                 58.0|
|  46825944|        9|                 14.0|
|  27408703|        4|                 90.0|
|  14623915|        9|                 68.0|
|  26750979|        3|                 79.0|
|  43530510|        8|                 19.0|
|  26670637|        3|                 24.0|
|  41883875|        8|                 58.0|
|  40929610|        6|                 68.0|
|  62842664|        3|                 63.0|
|  53319290|        6|                 27.0|
|  31356937|        7|                 21.0|
|  39749661|        4|                 76.0|
|  73912193|        3|                 27.0|
|  52715543|        5|                 71.0|
|  41020103|        7|                 41.0|
|  59834452|        6|                 52.0|
|  1666366



### 3.2. Create the window functions, one will sort ascending the other one descending. Partition by the product_id and sort by the pieces sold

In [47]:
window_desc = Window.partitionBy(col("product_id")).orderBy(
    col("total_num_pieces_sold").desc()
)
window_asc = Window.partitionBy(col("product_id")).orderBy(
    col("total_num_pieces_sold").asc()
)

### 3.3. Create a Dense Rank (to avoid holes)

In [50]:
total_sales_df = total_sales_df.withColumn(
    "rank_asc", dense_rank().over(window_asc)
).withColumn("rank_desc", dense_rank().over(window_desc))

In [67]:
total_sales_df.show(5)



+----------+---------+---------------------+--------+---------+
|product_id|seller_id|total_num_pieces_sold|rank_asc|rank_desc|
+----------+---------+---------------------+--------+---------+
|  10000047|        9|                 29.0|       1|        1|
|  10000712|        7|                 59.0|       1|        1|
|  10000715|        6|                 51.0|       1|        1|
|  10001485|        3|                 12.0|       1|        1|
|  10001689|        5|                 40.0|       1|        1|
+----------+---------+---------------------+--------+---------+
only showing top 5 rows



                                                                                

In [62]:
total_sales_df.orderBy(col("rank_desc").desc(), col("product_id")).show(3)



+----------+---------+---------------------+--------+---------+
|product_id|seller_id|total_num_pieces_sold|rank_asc|rank_desc|
+----------+---------+---------------------+--------+---------+
|  10978356|        7|                 27.0|       1|        3|
|  14542470|        5|                  3.0|       1|        3|
|  17944574|        8|                 15.0|       1|        3|
+----------+---------+---------------------+--------+---------+
only showing top 3 rows



                                                                                

In [63]:
total_sales_df.groupBy(col("product_id")).agg(
    count("seller_id").alias("seller_count")
).where(col("seller_count") > 1).orderBy(col("seller_count").desc()).limit(3).show()



+----------+------------+
|product_id|seller_count|
+----------+------------+
|  20774718|           3|
|  72017876|           3|
|  14542470|           3|
+----------+------------+



                                                                                

In [65]:
total_sales_df.filter(col("product_id") == "20774718").show()

+----------+---------+---------------------+--------+---------+
|product_id|seller_id|total_num_pieces_sold|rank_asc|rank_desc|
+----------+---------+---------------------+--------+---------+
|  20774718|        1|                 88.0|       3|        1|
|  20774718|        3|                 16.0|       2|        2|
|  20774718|        9|                  5.0|       1|        3|
+----------+---------+---------------------+--------+---------+



### 3.4. Get products that only have one row OR the products in which multiple sellers sold the same amount (i.e. all the employees that ever sold the product, sold the same exact amount)

In [68]:
single_seller_df = total_sales_df.where(col("rank_asc") == col("rank_desc")).select(
    col("product_id").alias("single_seller_product_id"),
    col("seller_id").alias("single_seller_seller_id"),
    lit("Only seller or multiple sellers with the same results").alias("type"),
)
single_seller_df.show(5)



+------------------------+-----------------------+--------------------+
|single_seller_product_id|single_seller_seller_id|                type|
+------------------------+-----------------------+--------------------+
|                10000047|                      9|Only seller or mu...|
|                10000712|                      7|Only seller or mu...|
|                10000715|                      6|Only seller or mu...|
|                10001485|                      3|Only seller or mu...|
|                10001689|                      5|Only seller or mu...|
+------------------------+-----------------------+--------------------+
only showing top 5 rows



                                                                                

### 3.5. Get the second top sellers

In [69]:
second_seller_df = total_sales_df.where(col("rank_desc") == 2).select(
    col("product_id").alias("second_seller_product_id"),
    col("seller_id").alias("second_seller_seller_id"),
    lit("Second top seller").alias("type"),
)
second_seller_df.show(5)



+------------------------+-----------------------+-----------------+
|second_seller_product_id|second_seller_seller_id|             type|
+------------------------+-----------------------+-----------------+
|                 1015908|                      8|Second top seller|
|                10277686|                      4|Second top seller|
|                10282881|                      7|Second top seller|
|                10646888|                      8|Second top seller|
|                10669968|                      3|Second top seller|
+------------------------+-----------------------+-----------------+
only showing top 5 rows



                                                                                

### 3.6. Get the least sellers and exclude those rows that are already included in the first piece. We also exclude the "second top sellers" that are also "least sellers"

In [70]:
least_seller_df = (
    total_sales_df.where(col("rank_asc") == 1)
    .select(col("product_id"), col("seller_id"), lit("Least Seller").alias("type"))
    .join(
        single_seller_df,
        (total_sales_df["seller_id"] == single_seller_df["single_seller_seller_id"])
        & (
            total_sales_df["product_id"] == single_seller_df["single_seller_product_id"]
        ),
        "left_anti",
    )
    .join(
        second_seller_df,
        (total_sales_df["seller_id"] == second_seller_df["second_seller_seller_id"])
        & (
            total_sales_df["product_id"] == second_seller_df["second_seller_product_id"]
        ),
        "left_anti",
    )
)

In [71]:
least_seller_df.show()

                                                                                

+----------+---------+------------+
|product_id|seller_id|        type|
+----------+---------+------------+
|  19986717|        1|Least Seller|
|  40496308|        5|Least Seller|
|  52606213|        7|Least Seller|
|   3534470|        3|Least Seller|
|  14542470|        5|Least Seller|
|  28592106|        5|Least Seller|
|  61475460|        7|Least Seller|
|  17944574|        8|Least Seller|
|  35669461|        4|Least Seller|
|  72017876|        1|Least Seller|
|  34681047|        5|Least Seller|
|  56011040|        5|Least Seller|
|  67723231|        5|Least Seller|
|  32602520|        9|Least Seller|
|  69790381|        5|Least Seller|
|  18182299|        7|Least Seller|
|  31136332|        9|Least Seller|
|  57735075|        9|Least Seller|
|  10978356|        7|Least Seller|
|  36269838|        8|Least Seller|
+----------+---------+------------+
only showing top 20 rows



### 3.7. Union all the pieces

In [72]:
total_sales_union_df = (
    least_seller_df.select(col("product_id"), col("seller_id"), col("type"))
    .union(
        second_seller_df.select(
            col("second_seller_product_id").alias("product_id"),
            col("second_seller_seller_id").alias("seller_id"),
            col("type"),
        )
    )
    .union(
        single_seller_df.select(
            col("single_seller_product_id").alias("product_id"),
            col("single_seller_seller_id").alias("seller_id"),
            col("type"),
        )
    )
)
total_sales_union_df.show()

                                                                                

+----------+---------+------------+
|product_id|seller_id|        type|
+----------+---------+------------+
|  19986717|        1|Least Seller|
|  14542470|        5|Least Seller|
|  28592106|        5|Least Seller|
|  40496308|        5|Least Seller|
|  52606213|        7|Least Seller|
|  61475460|        7|Least Seller|
|  17944574|        8|Least Seller|
|  72017876|        1|Least Seller|
|   3534470|        3|Least Seller|
|  35669461|        4|Least Seller|
|  34681047|        5|Least Seller|
|  56011040|        5|Least Seller|
|  67723231|        5|Least Seller|
|  32602520|        9|Least Seller|
|  69790381|        5|Least Seller|
|  10978356|        7|Least Seller|
|  18182299|        7|Least Seller|
|  36269838|        8|Least Seller|
|  20774718|        9|Least Seller|
|  31136332|        9|Least Seller|
+----------+---------+------------+
only showing top 20 rows





### 3.8. Which are the second top seller and least seller of product 20774718?

In [75]:
total_sales_union_df.where(col("product_id") == 20774718).show()

+----------+---------+--------------------+
|product_id|seller_id|                type|
+----------+---------+--------------------+
|  20774718|        9|        Least Seller|
|  20774718|        3|   Second top seller|
|  20774718|        3|Only seller or mu...|
+----------+---------+--------------------+



## 4. Create a new column called "hashed_bill" defined as follows:
- if the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, once for each 'A' (capital 'A') present in the text. E.g. if the bill text is 'nbAAnllA', you would apply hashing three times iteratively (only if the order number is even)
- if the order_id is odd: apply SHA256 hashing to the bill text
- Finally, check if there are any duplicate on the new column

### 4.1. Define the UDF function

In [6]:
def algo(order_id, bill_text):
    #   If number is even
    ret = bill_text.encode("utf-8")
    if int(order_id) % 2 == 0:
        #   Count number of 'A'
        cnt_A = bill_text.count("A")
        for _c in range(0, cnt_A):
            ret = hashlib.md5(ret).hexdigest().encode("utf-8")
        ret = ret.decode("utf-8")
    else:
        ret = hashlib.sha256(ret).hexdigest()
    return ret

### 4.2. Register the UDF function.

In [7]:
algo_udf = spark.udf.register("algo", algo)

### 4.3. Use the `algo_udf` to apply the aglorithm and then check if there is any duplicate hash in the table

In [None]:
sales_df.withColumn(
    "hashed_bill", algo_udf(col("order_id"), col("bill_raw_text"))
).groupby(col("hashed_bill")).agg(count("*").alias("cnt")).where(col("cnt") > 1).show()

23/04/19 15:20:01 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
	at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1773)
	at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:953)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:909)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1016)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase$ParquetRowGroupReaderImpl.readNextRowGroup(SpecificParquetRecordReaderBase.java:274)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR

23/04/19 15:20:02 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:02 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 1

23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:06 WARN TaskMemoryManager: Failed to allocate a page (2097136 bytes), try again.
23/04/19 15:20:07 WARN TaskMemoryManager