# Python based CCloud Producer Demo¶

How can I get some Wikipedia pageview data for timeseries analysis using OpenTSx into my Kafka Cluster?
https://wikitech.wikimedia.org/wiki/Analytics/AQS/Pageviews


# Wikipedia REST API
- https://wikimedia.org/api/rest_v1/
- https://github.com/MusikAnimal/pageviews

Whay is pagecount not such a good idea? 
- https://ajr.org/2014/03/18/pageview-passe-new-metrics-emerge-measure-audiences/

Technical Description of the Wikipedia API
- https://readthedocs.org/projects/wikipedia-api/downloads/pdf/latest/


In [1]:
import sys

In [2]:
!pip3 install wikipedia-api

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [3]:
!pip install "confluent-kafka[avro,json,protobuf]"
!pip install --no-binary :all: argparse
!pip install --no-binary :all: requests
!pip install --no-binary :all: certifi
!pip install mwviews
!pip install git+https://github.com/Commonists/pageview-api.git

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
Collecting argparse
  Using cached argparse-1.4.0.tar.gz (70 kB)
Skipping wheel build for argparse, due to binaries being disabled for it.
Installing collected packages: argparse
    Running setup.py install for argparse ... [?25ldone
[?25hSuccessfully installed argparse-1.4.0
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
Collecting git+https://github.com/Commonists/pageview-api.git
  C

In [4]:
!pip install dataclasses
!pip install dataclasses-json

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [5]:
import argparse

from confluent_kafka import avro, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from uuid import uuid4
import wikipediaapi
import json

In [6]:
from dataclasses import dataclass
from dataclasses_json import dataclass_json


@dataclass_json
@dataclass
class ContextLink:
    key: str
    lang: str
    title: str
    fullurl: str
    text: str
    length: str


In [7]:
@dataclass_json
@dataclass
class PageLink:
    key: str
    lang: str
    title: str
    fullurl: str
    text: str
    length: str 
  

In [8]:
class Wikipage(object):
    
    context = "DEV001"
    
    """
        Wikipage stores the meta-data for a Wikipage used in the analysis Kafka key.
    """

    # Use __slots__ to explicitly declare all data members.
    __slots__ = ["lang", "pagename", "categories", "id", "has_context_graph", "pagetext" , "lang_links", "page_links", "wiki"]

    def __init__(self, pagename=None, lang="de" ):
  
        self.wiki = wikipediaapi.Wikipedia(lang, extract_format=wikipediaapi.ExtractFormat.WIKI)
        self.lang = lang
        self.pagename = pagename

        # Unique id used to track produce request success/failures.
        # Do *not* include in the serialized object.
        self.id = uuid4()
        
        self.has_context_graph = False;
        self.lang_links = []
        self.page_links = []
        self.categories = []
        self.pagetext = ""

    @staticmethod
    def dict_to_name(obj, ctx):
        return Wikipage(obj['pagename'])

    @staticmethod
    def name_to_dict(name, ctx):
        return Wikipage.to_dict(name)

    def to_dict(self):
        """
            We must provide a dict representation of our class for serialization.
        """
        return dict(pagename=self.pagename,text=self.pagetext, length=len(self.pagetext))
    
    def load_text_from_wikipedia_api(self,name):
        #print( "Load pagetext ..." )
        page_py = self.wiki.page(name)
        #print( len(page_py.text) )
        self.pagetext = page_py.text
        self.categories = page_py.categories
        #print( self.pagetext )
        return
    
    def print_langlinks(self,page):
        langlinks = page.langlinks
        for k in sorted(langlinks.keys()):
            v = langlinks[k]
            print("%s: %s - %s: %s" % (k, v.language, v.title, v.fullurl))
            
    def print_pagelinks(self,page):
        links = page.links
        for title in sorted(links.keys()):
            print("*** %s: %s [%s]" % (title, links[title], type(links[title])))        

    def load_pagelinks(self,page):
        nr_of_missing_urls = 0
        links = page.links
        # for title in sorted(links.keys()):
            # print("### %s: %s [%s]" % (title, links[title], type(links[title])))        

        for k in sorted(links.keys()):
            v = links[k]
            #print( v, type(v) )
            
            try:
                #print( v.canonicalurl, v.fullurl )
                l = PageLink(k, v.language, v.title, v.lurl, v.text, len(v.text))
                self.page_links.append(l)
            except Exception as inst:
                nr_of_missing_urls = nr_of_missing_urls + 1
                l = PageLink(k, v.language, v.title, "NO_URL", v.text, len(v.text))
                self.page_links.append(l) 
                
        print("Oops! {} link-URLs missing on page {}.{}.\n".format(nr_of_missing_urls, page.language, page.title ))

                
    def load_langlinks(self,page):
        langlinks = page.langlinks
        if ( len(langlinks) > 0 ):
            self.has_context_graph = True
        for k in sorted(langlinks.keys()):
            v = langlinks[k]
            #print("%s: %s - %s: %s" % (k, v.language, v.title, v.fullurl))
            l = ContextLink(k, v.language, v.title, v.fullurl, v.text, len(v.text))
            self.lang_links.append(l)
    
    def load_links_from_wikipedia_api(self,name):
        page_py = self.wiki.page(name)
        self.load_langlinks(page_py)
        self.load_pagelinks(page_py)
        return
    
    def exists(self):
        page_py = self.wiki.page( self.pagename )
        return page_py.exists()
        
    def load_context_graph(self,name):
        self.load_text_from_wikipedia_api(name)
        self.load_links_from_wikipedia_api(name)

    def get_key_dict(self):
        key_dict = { 'study_context':self.context, 'lang':'de', 'pagename': self.pagename }
        return key_dict

    def get_key(self):
        return json.dumps(self.get_key_dict())
    
    def get_page_links(self):
        page_links_dict = {}
        for i in self.page_links:
            page_links_dict[i.key] = i
        return page_links_dict
        
    def get_lang_links(self):
        lang_links_dict = {}
        for i in self.lang_links:
            lang_links_dict[i.key] = i
        return lang_links_dict
    
    def get_value_context_graph(self):
        #print( self.get_page_links() )
        #print( self.get_lang_links() )
        graph_value_dict = { 'key':self.get_key_dict(), 'CentralNode':self.to_dict(), 'zCategories':len(self.categories), 'zPageLinks': len(self.get_page_links()), 'zContextLinks': len(self.get_lang_links())  }
        return json.dumps( graph_value_dict )
        
 

In [9]:
p1 = Wikipage(pagename='Stollberg')
    
print( p1.get_key() )
print( p1.get_value_context_graph() )


{"study_context": "DEV001", "lang": "de", "pagename": "Stollberg"}
{"key": {"study_context": "DEV001", "lang": "de", "pagename": "Stollberg"}, "CentralNode": {"pagename": "Stollberg", "text": "", "length": 0}, "zCategories": 0, "zPageLinks": 0, "zContextLinks": 0}


In [10]:

p1.load_context_graph( p1.pagename )

if p1.exists() == True:
    print( "PAGE {} HAS Context Graph: {} ".format(p1.pagename, p1.has_context_graph ) )
    #print( "PAGETEXT: {}".format(p1.pagetext) )
    #print( "        ## Context Graph: ".format(p1.pagename) )   
    #print( "        >> Inter-Wiki Links:\n{}".format(p1.lang_links) )   
    #print( "        >> Page Links:\n{}".format(p1.page_links) )   
else:
    print( "{} DOES NOT EXIST!".format(p1.pagename))


Oops! 35 link-URLs missing on page de.Stollberg.

PAGE Stollberg HAS Context Graph: True 


In [11]:
print( p1.get_key() )
print( p1.get_value_context_graph() )

{"study_context": "DEV001", "lang": "de", "pagename": "Stollberg"}
{"key": {"study_context": "DEV001", "lang": "de", "pagename": "Stollberg"}, "CentralNode": {"pagename": "Stollberg", "text": "Stollberg steht f\u00fcr folgende geographischen Objekte:\nOrte:\n\nStollberg/Erzgebirge, Stadt im Erzgebirgskreis, Sachsen\nStollberg (Bordelum), Ortsteil von Bordelum, Kreis Nordfriesland, Schleswig-Holstein\nStollberg (Untergriesbach), Ortsteil von Untergriesbach, Landkreis Passau, Bayern\nStollberg, Katastralgemeinde von Brand-Laaben, Bezirk Sankt P\u00f6lten-Land, Nieder\u00f6sterreich\nStollberg (Thalgau), Ortsteil von Thalgau, Bezirk Salzburg-Umgebung, \u00d6sterreich\nStollberg (Littau LU), Ortsteil von Littau, Kanton Luzern, SchweizKreise:\n\nLandkreis Stollberg, Landkreis in Sachsen\nKreis Stollberg, Landkreis im Bezirk Karl-Marx-Stadt, Deutsche Demokratische Republik\u00c4mter:\n\nAmt Stollberg im Kreis Nordfriesland, Schleswig-Holstein\nAmt Stollberg (Erzgebirge), Amt in SachsenBerge:

In [12]:
class Count(object):
    """
        Count stores the pageview counts.
    """

    # Use __slots__ to explicitly declare all data members.
    __slots__ = ["count", "id"]

    def __init__(self, count=None):
        self.count = count
        # Unique id used to track produce request success/failures.
        # Do *not* include in the serialized object.
        self.id = uuid4()

    @staticmethod
    def dict_to_count(obj, ctx):
        return Count(obj['count'])

    @staticmethod
    def count_to_dict(count, ctx):
        return Count.to_dict(count)

    def to_dict(self):
        """
            We must provide a dict representation of our class for serialization.
        """
        return dict(count=self.count)

In [13]:
#
# Helper function to read CCloud configuration.
#
def read_ccloud_config(config_file):
    """Read Confluent Cloud configuration for librdkafka clients"""

    conf = {}
    with open(config_file) as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                conf[parameter] = value.strip()

    return conf

#
# Helper function to create Kafka Topic.
#
def create_topic(conf, topic):
    """
        Create a topic if needed
        Examples of additional admin API functionality:
        https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/adminapi.py
    """

    a = AdminClient({
           'bootstrap.servers': conf['bootstrap.servers'],
           'sasl.mechanisms': 'PLAIN',
           'security.protocol': 'SASL_SSL',
           'sasl.username': conf['sasl.username'],
           'sasl.password': conf['sasl.password']
    })
    fs = a.create_topics([NewTopic(
         topic,
         num_partitions=1,
         replication_factor=3
    )])
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print("Topic {} created".format(topic))
        except Exception as e:
            # Continue if error code TOPIC_ALREADY_EXISTS, which may be true
            # Otherwise fail fast
            if e.args[0].code() != KafkaError.TOPIC_ALREADY_EXISTS:
                print("Failed to create topic {}: {}".format(topic, e))
                sys.exit(1)

In [14]:

# =============================================================================
#
# Produce messages to Confluent Cloud
#
# Using Confluent Python Client for Apache Kafka
# Writes JSON data, no integration with Confluent Cloud Schema Registry
#
# =============================================================================
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer



In [24]:



# Define arguments and configurations and initialize

#config_file = "/Users/mkaempf/.confluent/python.config"
#
# The current path is inside the cloned project ... so we can use relative paths.
#
config_file = "./../../config/private/ccloud.props"

topic_context_graph = "aws_sm_topic_context_graph"
topic_pageview_episodes = "aws_sm_topic_pageview_episodes"

conf = read_ccloud_config(config_file)

print( conf )

#
# The producer doesn't like some properties:
#
# for full list of configurations, see:
#  https://docs.confluent.io/platform/current/clients/confluent-kafka-python/#serializingproducer
conf.pop( 'schema.registry.url' )
conf.pop( 'basic.auth.credentials.source' )
conf.pop( 'basic.auth.user.info' )
conf.pop( 'key.serializer' )
conf.pop( 'value.serializer' )



{'request.timeout.ms': '20000', 'retry.backoff.ms': '500', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'io.confluent.kafka.serializers.KafkaAvroSerializer', 'bootstrap.servers': 'pkc-zm3p0.eu-north-1.aws.confluent.cloud:9092', 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'PLAIN', 'sasl.username': 'RHV2GUBWOQ2K63NQ', 'sasl.password': '6mhQ8hoxSShsg2DILjFYO9m5KGpl/6c3pN09sO1Zb4aeyOoZxhirl7LclHJ8k0fV', 'schema.registry.url': 'https://psrc-xm8wx.eu-central-1.aws.confluent.cloud', 'basic.auth.credentials.source': 'USER_INFO', 'basic.auth.user.info': 'RJ2CVSTZX5S4GPAC:f7ZaChPx88StOzi2PIYqOExym/gj1S49jRF9/dtz0Bygmwa2HPYYvQOveX7Hk7Bd'}


'io.confluent.kafka.serializers.KafkaAvroSerializer'

In [25]:
create_topic( conf, topic_context_graph )
create_topic( conf, topic_pageview_episodes )

producer = Producer(conf)

delivered_records = 0

Topic aws_sm_topic_context_graph created
Topic aws_sm_topic_pageview_episodes created


In [26]:
# Optional per-message on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or
# permanently failed delivery (after retries).
def acked(err, msg):
    global delivered_records
    """Delivery report handler called on
    successful or failed delivery of message
    """
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        delivered_records += 1
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))
        

In [32]:
pages = ["Stollberg", "Bern", "Helsinki", "Paris", "London", "Berlin", "New_York_City" ]

import pageviewapi

from datetime import datetime

# datetime object containing current date and time
tStart = datetime.now()
 
print("now =", tStart)

# dd/mm/YY H:M:S
dt_string = tStart.strftime("%d/%m/%Y %H:%M:%S")
print("date and time =", dt_string)

n = 1
for pn in pages:
    
    print( "### Processing [{}]".format(pn) )
    
    n = n+1
    
    page_object = Wikipage(pagename=pn)
    page_object.load_context_graph( page_object.pagename )
    
    count_object = Count()
    count_object.count = n
    
    print("Producing JSON record: {}\t{}".format(page_object.pagename, count_object.count))
    
    if page_object.has_context_graph == True:
           
        graph_data = page_object.get_value_context_graph()
 
        ts_data = pageviewapi.per_article( '{}.wikipedia'.format(page_object.lang), pn, '20200101', '20201231',
                        access='all-access', agent='all-agents', granularity='daily')
        
        producer.produce(topic=topic_context_graph, key=page_object.get_key(), value=json.dumps(graph_data), on_delivery=acked)
        producer.produce(topic=topic_pageview_episodes, key=page_object.get_key(), value=json.dumps(ts_data), on_delivery=acked)

        producer.poll(0)

    else:
        print("No data for page {}".format(page_object.pagename))
        

producer.flush()

print("{} messages were produced to topic {}!".format(delivered_records, topic))

tEnd = datetime.now()
 
print("now =", tEnd)

print("loadtime => ", tEnd - tStart)

now = 2021-11-21 13:25:18.181467
date and time = 21/11/2021 13:25:18
### Processing [Stollberg]
Oops! 35 link-URLs missing on page de.Stollberg.

Producing JSON record: Stollberg	2
### Processing [Bern]


ConnectionError: HTTPSConnectionPool(host='hsb.wikipedia.org', port=443): Max retries exceeded with url: /w/api.php?action=query&prop=extracts&titles=Bern&explaintext=1&exsectionformat=wiki&format=json&redirects=1 (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f1ee479fac8>: Failed to establish a new connection: [Errno 101] Network is unreachable',))

In [22]:

pageviewapi.per_article('en.wikipedia', 'Paris', '20151106', '20151120',
                        access='all-access', agent='all-agents', granularity='daily')

AttrDict({'items': [{'project': 'en.wikipedia', 'article': 'Paris', 'granularity': 'daily', 'timestamp': '2015110600', 'access': 'all-access', 'agent': 'all-agents', 'views': 9168}, {'project': 'en.wikipedia', 'article': 'Paris', 'granularity': 'daily', 'timestamp': '2015110700', 'access': 'all-access', 'agent': 'all-agents', 'views': 7939}, {'project': 'en.wikipedia', 'article': 'Paris', 'granularity': 'daily', 'timestamp': '2015110800', 'access': 'all-access', 'agent': 'all-agents', 'views': 8337}, {'project': 'en.wikipedia', 'article': 'Paris', 'granularity': 'daily', 'timestamp': '2015110900', 'access': 'all-access', 'agent': 'all-agents', 'views': 9355}, {'project': 'en.wikipedia', 'article': 'Paris', 'granularity': 'daily', 'timestamp': '2015111000', 'access': 'all-access', 'agent': 'all-agents', 'views': 9485}, {'project': 'en.wikipedia', 'article': 'Paris', 'granularity': 'daily', 'timestamp': '2015111100', 'access': 'all-access', 'agent': 'all-agents', 'views': 9140}, {'projec

In [23]:
pageviewapi.legacy_pagecounts('fr.wikipedia', '2010010100', '2011010100', granularity='daily')

AttrDict({'items': [{'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010100', 'count': 14939284}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010200', 'count': 15944604}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010300', 'count': 15641373}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010400', 'count': 14295617}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010500', 'count': 13814118}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010600', 'count': 17645546}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp': '2010010700', 'count': 17253156}, {'project': 'fr.wikipedia', 'access-site': 'all-sites', 'granularity': 'daily', 'timestamp'