# Install lib

In [1]:
!pip install kafka-python
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from kafka import KafkaConsumer
import json
import pandas as pd

In [3]:
data = []

In [4]:
# use this function to print the json with the indent=4
def jprint(data):
  print(json.dumps(data,indent=4))

# Kafka Consumer

In [5]:
consumer = KafkaConsumer(
    '2019',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
)

print("Starting the consumer...")

for message in consumer:
    data.append(message.value)
    print(f"Received: {message.value}")

print("get all data from 2019, done")

Starting the consumer...
Received: {'id': '201900136', 'publicDate': '01/12/2019', 'source': 1, 'coAuthorship': 2299, 'citationCount': '43', 'refCount': '46', 'Class': ['PHYS']}
Received: {'id': '201900109', 'publicDate': '01/12/2019', 'source': 1, 'coAuthorship': 4, 'citationCount': None, 'refCount': '42', 'Class': ['VETE']}
Received: {'id': '201900131', 'publicDate': '01/12/2019', 'source': 1, 'coAuthorship': 7, 'citationCount': '8', 'refCount': '33', 'Class': ['MEDI', 'BIOC']}
Received: {'id': '201900107', 'publicDate': '01/12/2019', 'source': 1, 'coAuthorship': 4, 'citationCount': None, 'refCount': '31', 'Class': ['VETE']}
Received: {'id': '201900138', 'publicDate': '01/12/2019', 'source': 1, 'coAuthorship': 6, 'citationCount': '12', 'refCount': '43', 'Class': ['CENG', 'CHEM']}
Received: {'id': '201900100', 'publicDate': '01/12/2019', 'source': 1, 'coAuthorship': 3, 'citationCount': '2', 'refCount': '9', 'Class': ['BUSI', 'COMP', 'DECI', 'PHYS']}
Received: {'id': '201900154', 'publ

KeyboardInterrupt: 

# Spark for processing

In [18]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(data)
df = df.select("id", "publicDate", "source", "citationCount", "coAuthorship", "refCount", "Class") # reordering
df.show(10)

24/05/07 22:37:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/07 22:37:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


+---------+----------+------+-------------+------------+--------+--------------------+
|       id|publicDate|source|citationCount|coAuthorship|refCount|               Class|
+---------+----------+------+-------------+------------+--------+--------------------+
|201900136|01/12/2019|     1|           43|        2299|      46|              [PHYS]|
|201900109|01/12/2019|     1|         NULL|           4|      42|              [VETE]|
|201900131|01/12/2019|     1|            8|           7|      33|        [MEDI, BIOC]|
|201900107|01/12/2019|     1|         NULL|           4|      31|              [VETE]|
|201900138|01/12/2019|     1|           12|           6|      43|        [CENG, CHEM]|
|201900100|01/12/2019|     1|            2|           3|       9|[BUSI, COMP, DECI...|
|201900154|01/12/2019|     1|           58|          15|        |              [MEDI]|
|201900198|01/12/2019|     1|           32|          84|      24|              [MEDI]|
|201900153|01/12/2019|     1|           10|

                                                                                

In [19]:
df.count()

301

In [20]:
# cast type accordingly
df = df.withColumn('citationCount', df.citationCount.cast('int'))
df = df.withColumn('coAuthorship', df.coAuthorship.cast('int'))
df = df.withColumn('refCount', df.refCount.cast('int'))

In [21]:
from pyspark.sql.functions import avg, min, max, countDistinct, explode, split, col, round, sum

max_values = df.agg(
    max("citationCount").alias("maxCitation"),
    max("refCount").alias("maxRef"),
    max("coAuthorship").alias("maxCoAuthor")
).collect()[0]

# max value for each feature for normalization
max_citation = max_values["maxCitation"]
max_ref = max_values["maxRef"]
max_coauthor = max_values["maxCoAuthor"]

In [22]:
# find all Class
genre_counts = df.withColumn("Genre", explode(col("Class")))\
                 .groupBy("Genre")\
                 .count()  

print('number of all the class:', genre_counts.count())
genre_counts.show()

number of all the class: 27
+-----+-----+
|Genre|count|
+-----+-----+
| COMP|   33|
| MATE|   10|
| IMMU|   16|
| PHYS|   37|
| BIOC|   37|
| VETE|    9|
| ENGI|   27|
| PHAR|   11|
| MEDI|   78|
| ECON|    9|
| MULT|   35|
| ENVI|   24|
| DECI|    4|
| AGRI|   20|
| NURS|    5|
| CENG|   19|
| SOCI|   17|
| EART|    6|
| CHEM|   20|
| BUSI|    5|
+-----+-----+
only showing top 20 rows



In [23]:
# explode the class
exploded_df = df.withColumn("Class", explode(col("Class")))
# exploded_df.show(20)

In [24]:
# drop na for invalid rows
cleaned_df = exploded_df.dropna()
print(cleaned_df.count())
cleaned_df.show(5)

449
+---------+----------+------+-------------+------------+--------+-----+
|       id|publicDate|source|citationCount|coAuthorship|refCount|Class|
+---------+----------+------+-------------+------------+--------+-----+
|201900136|01/12/2019|     1|           43|        2299|      46| PHYS|
|201900131|01/12/2019|     1|            8|           7|      33| MEDI|
|201900131|01/12/2019|     1|            8|           7|      33| BIOC|
|201900138|01/12/2019|     1|           12|           6|      43| CENG|
|201900138|01/12/2019|     1|           12|           6|      43| CHEM|
+---------+----------+------+-------------+------------+--------+-----+
only showing top 5 rows



In [29]:
# compute the score for each paper
cleaned_df = cleaned_df.withColumn(
    "Score",
    round(
        col("source") * (
            0.4 * (col("citationCount") / max_citation * 10) +
            0.2 * (col("refCount") / max_ref * 10) +
            0.1 * (col("coAuthorship") / max_coauthor * 10)
        ), 4
    )
)

cleaned_df.show(10)

+---------+----------+------+-------------+------------+--------+-----+------+----+
|       id|publicDate|source|citationCount|coAuthorship|refCount|Class| Score|Year|
+---------+----------+------+-------------+------------+--------+-----+------+----+
|201900136|2019-12-01|     1|           43|        2299|      46| PHYS|2.9037|2019|
|201900131|2019-12-01|     1|            8|           7|      33| MEDI|0.7001|2019|
|201900131|2019-12-01|     1|            8|           7|      33| BIOC|0.7001|2019|
|201900138|2019-12-01|     1|           12|           6|      43| CENG| 0.958|2019|
|201900138|2019-12-01|     1|           12|           6|      43| CHEM| 0.958|2019|
|201900100|2019-12-01|     1|            2|           3|       9| BUSI| 0.186|2019|
|201900100|2019-12-01|     1|            2|           3|       9| COMP| 0.186|2019|
|201900100|2019-12-01|     1|            2|           3|       9| DECI| 0.186|2019|
|201900100|2019-12-01|     1|            2|           3|       9| PHYS| 0.18

In [34]:
from pyspark.sql.functions import to_date, year, quarter, col, sum, round, count
from pyspark.sql.window import Window

cleaned_df = cleaned_df.withColumn("publicDate", to_date(col("publicDate"), "dd/MM/yyyy"))
cleaned_df = cleaned_df.withColumn("Year", year(col("publicDate")))

grouped_df = cleaned_df.groupBy("Class", "Year").agg(
    round(sum("Score"), 4).alias("Total Score"),
    count("id").alias("Paper Count")  
)

# Display the result
grouped_df.show()

+-----+----+-----------+-----------+
|Class|Year|Total Score|Paper Count|
+-----+----+-----------+-----------+
| AGRI|2019|     18.983|         20|
| CENG|2019|    25.8145|         19|
| ENGI|2019|    31.7309|         27|
| MEDI|2019|    60.2074|         75|
| MATE|2019|    12.0498|         10|
| ENVI|2019|    33.1215|         24|
| EART|2019|    11.0173|          6|
| PHAR|2019|    12.1536|         11|
| CHEM|2019|    22.5635|         20|
| BIOC|2019|    26.7993|         37|
| NURS|2019|     4.3298|          5|
| DECI|2019|     2.3417|          4|
| PHYS|2019|    44.8678|         37|
| ECON|2019|     5.8369|          8|
| MULT|2019|    52.8098|         34|
| BUSI|2019|     2.4173|          5|
| SOCI|2019|     22.048|         16|
| COMP|2019|    23.1798|         33|
| IMMU|2019|     16.095|         16|
| PSYC|2019|     0.5589|          1|
+-----+----+-----------+-----------+
only showing top 20 rows



In [36]:
# Save the DataFrame to a CSV file
cumulative_df.write.csv(path="output_2019.csv", mode="overwrite", header=True)

spark.stop()

Py4JJavaError: An error occurred while calling o525.csv.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:239)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics(DataWritingCommand.scala:55)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics$(DataWritingCommand.scala:55)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics$lzycompute(InsertIntoHadoopFsRelationCommand.scala:47)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics(InsertIntoHadoopFsRelationCommand.scala:47)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics$lzycompute(commands.scala:109)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics(commands.scala:109)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:63)
	at org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:120)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
