In [1]:
from kafka import KafkaClient
from kafka.cluster import ClusterMetadata

# Create a connection to retrieve metadata
meta_cluster_conn = ClusterMetadata(
    bootstrap_servers="localhost:9092", # Specific the broker address to connect to
)

# retrieve metadata about the cluster
print(meta_cluster_conn.brokers())


# Create a connection to our KafkaBroker to check if it is running
client_conn = KafkaClient(
    bootstrap_servers="localhost:9092", # Specific the broker address to connect to
    client_id="Broker test" # Create an id from this client for reference
)

# Check that the server is connected and running
print(client_conn.bootstrap_connected())
# Check our Kafka version number
print(client_conn.check_version())

{BrokerMetadata(nodeId='bootstrap-0', host='localhost', port=9092, rack=None)}
True
(2, 5, 0)


In [10]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.cluster import ClusterMetadata

# Create a new Kafka client to adminstrate our Kafka broker
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id="Kafka Administrator"
)

# topics must be pass as a list to the create_topics method
topics = []
topics.append(NewTopic(name="MyFourthKafKaTopic", num_partitions=1, replication_factor=1))

# Topics to create must be passed as a list
admin_client.create_topics(new_topics=topics)


CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='MyFourthKafKaTopic', error_code=0, error_message=None)])

In [5]:
import kafka
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.cluster import ClusterMetadata
from kafka import KafkaClient
from kafka.cluster import ClusterMetadata
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id="Kafka Administrator"
)
admin_client.list_topics()

['MySecondKafKaTopic',
 'MyFourthKafKaTopic',
 'MyFirstKafKaTopic',
 'MyFixedKafKaTopic']

In [21]:
admin_client.describe_topics(topics=["MyFourthKafkaTopic"])

[{'error_code': 0,
  'topic': 'MyFourthKafkaTopic',
  'is_internal': False,
  'partitions': [{'error_code': 0,
    'partition': 0,
    'leader': 0,
    'replicas': [0],
    'isr': [0],
    'offline_replicas': []}]}]

In [1]:
# Lets create some test data to send using our kafka producer

data_set_1 = [
    {
        "Item": "samsung galaxy note 9",
        "Price": "600",
        "Quantity": "1"
    },
    {
        "Item": "Harmonic resonator",
        "Price": "excluded",
        "Quantity": 1
    }
]

In [2]:
from kafka import KafkaProducer
from json import dumps

# Configure our producer which will send data to  the topic
gamma_producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    client_id="12/10 producer",
    value_serializer=lambda products: dumps(products).encode("ascii")
) 




In [3]:
# Send data to the topic

from kafka import KafkaProducer
for products in data_set_1:
    gamma_producer.send(topic="MyFirstKafkaTopic", value=products)




In [36]:
from kafka import KafkaConsumer
from json import loads

# create our consumer to retrieve the message from the topics
data_stream_consumer = KafkaConsumer(
    bootstrap_servers="localhost:9092",    
    value_deserializer=lambda message: loads(message),
    auto_offset_reset="earliest" # This value ensures the messages are read from the beginning 
)

data_stream_consumer.subscribe(topics=["MLData", "Retaildata"])

In [37]:
# Loops through all messages in the consumer and prints them out individually
for message in data_stream_consumer:
    print(message.value)
    print(message.timestamp)
    print(message.topic)

{'Model_name': 'ResNet-50', 'Accuracy': '92.1', 'Framework_used': 'Pytorch'}
1663863993874
MLData
{'Model_name': 'Random Forest', 'Accuracy': '82.7', 'Framework_used': 'SKLearn'}
1663863993874
MLData
{'Model_name': 'ResNet-50', 'Accuracy': '92.1', 'Framework_used': 'Pytorch'}
1663865486923
MLData
{'Model_name': 'Random Forest', 'Accuracy': '82.7', 'Framework_used': 'SKLearn'}
1663865486926
MLData
{'Model_name': 'ResNet-50', 'Accuracy': '92.1', 'Framework_used': 'Pytorch'}
1663865489093
MLData
{'Model_name': 'Random Forest', 'Accuracy': '82.7', 'Framework_used': 'SKLearn'}
1663865489093
MLData
{'Model_name': 'ResNet-50', 'Accuracy': '92.1', 'Framework_used': 'Pytorch'}
1663865489510
MLData
{'Model_name': 'Random Forest', 'Accuracy': '82.7', 'Framework_used': 'SKLearn'}
1663865489513
MLData
{'Model_name': 'ResNet-50', 'Accuracy': '92.1', 'Framework_used': 'Pytorch'}
1663865490119
MLData
{'Model_name': 'Random Forest', 'Accuracy': '82.7', 'Framework_used': 'SKLearn'}
1663865490123
MLData


KeyboardInterrupt: 

In [35]:
for message in data_stream_consumer:
    print(message)
    

KeyboardInterrupt: 

In [None]:
import json
json.dumps()

In [6]:
from kafka import KafkaConsumer
import json
from json import loads
import boto3
import tempfile
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')

# create our consumer to retrieve the message from the topics
data_stream_consumer = KafkaConsumer(
    bootstrap_servers="localhost:9092",    
    value_deserializer=lambda message: loads(message),
    auto_offset_reset="earliest" # This value ensures the messages are read from the beginning 
)

data_stream_consumer.subscribe(topics=["MyFirstKafkaTopic"])
for msg in data_stream_consumer:
    print("Topic name=%s,Message=%s"%(msg.topic,msg.value))
    print(type(msg.value))
    print(msg.value)
    file = json.dumps(msg.value)
    #s3.upload_fileobj(file, 'pinterestdata7b8c2d40-08c6-4eb4-8c96-2f0080c4653b', msg.value['Item'])
    s3_client.put_object(
     Body=file,
     Bucket='pinterestdata7b8c2d40-08c6-4eb4-8c96-2f0080c4653b',
     Key= msg.value['Item']
    )

Topic name=MyFirstKafkaTopic,Message={'Item': 'samsung galaxy note 9', 'Price': '600', 'Quantity': '1'}
<class 'dict'>
{'Item': 'samsung galaxy note 9', 'Price': '600', 'Quantity': '1'}
Topic name=MyFirstKafkaTopic,Message={'Item': 'Harmonic resonator', 'Price': 'excluded', 'Quantity': 1}
<class 'dict'>
{'Item': 'Harmonic resonator', 'Price': 'excluded', 'Quantity': 1}


KeyboardInterrupt: 

In [1]:
import findspark

findspark.init()

In [None]:
import configparser
import findspark
import os
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key")

In [2]:
sc.stop()

''

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as f
import os 
import pandas as pd
#SparkContext.stop()
# Adding the packages required to get data from S3  
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages com.amazonaws:aws-java-sdk-s3:1.12.196,org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell"
# Creating our Spark configuration
conf = SparkConf() \
    .setAppName('S3toSpark') \
    .setMaster('local[*]')

sc=SparkContext.getOrCreate(conf=conf)

# Configure the setting to read from the S3 bucket
accessKeyId=os.environ["AWS_ACCESSKEY_ID"]
secretAccessKey=os.environ["AWS_SECRET_KEY"]
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.access.key', accessKeyId)
hadoopConf.set('fs.s3a.secret.key', secretAccessKey)
hadoopConf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') # Allows the package to authenticate with AWS

# Create our Spark session
spark=SparkSession(sc)
#sc.stop()
# Read from the S3 bucket
df = spark.read.json("s3a://pinterestdata7b8c2d40-08c6-4eb4-8c96-2f0080c4653b/*.json") # You may want to change this to read csv depending on the files your reading from the bucket
df1 = df.select(df["category"],df["unique_id"],df["title"],df["description"],df["tag_list"],df["follower_count"]).show()
#df.groupby("category")
#s_df.show()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
#pysparkdf = spark.createDataFrame(df1)
#df.toPandas()

df = df.withColumn('follower_count', f.regexp_replace("follower_count", "User Info Error", "0"))
df = df.withColumn('follower_count', f.regexp_replace("follower_count", "k", "000"))
df = df.withColumn('follower_count', f.regexp_replace("follower_count", "M", "000000"))
df = df.withColumn('follower_count', f.col("follower_count").cast("Int"))
df = df.withColumn('tag_list', f.regexp_replace("tag_list", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "None"))
df = df.sort("category").show()


+--------------+--------------------+--------------------+--------------------+--------------------+---------------+
|      category|           unique_id|               title|         description|            tag_list| follower_count|
+--------------+--------------------+--------------------+--------------------+--------------------+---------------+
|     education|21b59ba9-829d-4c3...|Podcasts for Teac...|Podcasts for Teac...|Middle School Cla...|            25k|
|       finance|8fb2af68-543b-463...|Dave Ramsey's 7 B...|If you love budge...|Financial Peace,F...|            26k|
|event-planning|b75b6f87-deb3-444...|The Vault: Curate...|Sacramento Califo...|60th Anniversary ...|             6M|
|       finance|1e1f0c8b-9fcf-460...|Island Oasis Coup...|Description Coupo...|Grocery Items,Gro...|              0|
|diy-and-crafts|52fa3af5-24a4-4cc...|UFO Paper Plate C...|A fun space activ...|Paper Plate Craft...|           192k|
|diy-and-crafts|9bf39437-42a6-4f0...|25 Super Fun Summ...|Keep t

In [14]:
x = input("please enter a number: ")
y = int(x)
def func(y):
    count = 0
    while (count <= y):
        count +=1
        if (count%5) == 0 and (count%3) == 0:
            print("fizzbuzz")
        if (count%3) == 0:
            print("fizz")
        else:
            print(count)
    
func(y)


1
2
fizz
4
5
fizz
7
8
fizz
10
11
fizz
13
14
fizzbuzz
fizz
16
17


In [3]:
def operateNumber(N1, N2):
    if (N1%2) == 0 and (N2%2) == 0:
        print(N1+N2)
    else:
        print(N1*N2)

operateNumber(8,2)

10


In [6]:
def sumOdd(n,Arr):
    base = 0
    for number in Arr:
        if number%2 != 0:
            base += number
            print(base)
        else:
            pass

sumOdd(5,[33,3,22,1,10])

33
36
37


In [7]:
def charinstring(S,c):
    count = 0
    for character in S:
        if character.capitalize() == c.capitalize():
            count +=1
            print(count)
        else:
            pass
charinstring("hello world","o")

1
2


In [7]:
import os
password = os.environ["PGADMIN4_PASSWORD"]
print(password)

Yoruichi786


In [5]:
#
# screen_output is the output of the cli on a router from the command
# "show interface" on a router.
#
# write a python function which takes screen_output as a parameter
# and returns an appropriate data structure for the data to be used
# by the caller.
# When choosing the returned data structure try to think about the
# questions a user of the function would want to get from the returned
# data.
# For example;
#    is interface <foo> link state up?
#    what speed is interface <foo> using?
#    what IP addresses does interface <foo> have?
#
# Please show how you tested your solution.
#
#
#
import re
screen_output = """
vyatta@dut-2:~$ show interfaces
Codes: S - State, L - Link, u - Up, D - Down, A - Admin Down
Interface       IP Address(es)                    S/L  Speed/Duplex  Description
---------       --------------                    ---  ------------  -----------
dp0ce0          -                                 u/D  auto/auto
dp0ce1          -                                 u/D  auto/auto
dp0p7s0         10.156.43.100/24                  u/u  a-1g/a-full   Mgmt Network
dp0xe0          -                                 u/u  10g/full
dp0xe1          -                                 u/u  10g/full
dp0xe11         10.156.21.2/20                    u/u  1g/full
dp0xe12         -                                 A/D  auto/auto     link-to-TimeProvider4100-eth4
dp0xe13         -                                 u/D  auto/auto
lo1             1.1.1.1/32                        u/u  -/-
sw0             -                                 u/u  -/-
sw0.50          10.0.0.2/30                       u/u  -/-
sw0.100         192.168.4.1/24                    u/D  -/-
sw0.1001        100.100.0.1/24                    u/u  -/-
                100:100::1/64
sw0.1002        192.168.1.2/30                    u/u  -/-
                192.168.2.2/30
                cef:0:4:3:2:1:0:4/128
sw0.1026        100.100.8.1/24                    u/u  -/-
                100:100:8::1/64
sw0.4004        4.4.0.1/24                        u/D  -/-
"""

def read_data(screen_output):
    lines=screen_output.split("\n")
    #extract=re.compile(r"././")
    list_of_dicts=[]
    dict={}
    for idx,line in enumerate(lines[5:-1]):
        #print(line[0:8].strip(" "))
        n=list(re.split(r'\s+',line))
        list_of_dicts.append({"Interface":line[0:8].strip(" ")})
        if line[0:8] == '':
            list_of_dicts[idx-1].append({"IP Adress(es)":n[1]})
        
        try:
            list_of_dicts[idx].update({"IP Adress(es)":n[1]})
        except:
            list_of_dicts[idx].update({"IP Adress(es)":' '}) 
        try:
            list_of_dicts[idx].update({"S/L":n[2]})
        except:
            list_of_dicts[idx].update({"S/L":''})
        try:
            list_of_dicts[idx].update({"Speed/Duplex":n[3]})
        except:
            list_of_dicts[idx].update({"Speed/Duplex":''})
            #v=re.search(r'[ADu]{1}\/[ADu]{1}',line)
            #b=re.match(r'\S*\/*',line)
            #n=re.split(r'\s+',line)
        
    #print(list_of_dicts)
    return list_of_dicts

def edit_dict(screen_output):
    list_of_dicts=read_data(screen_output)
    for i in list_of_dicts:
        if list_of_dicts[i]["Interface"]=='':
            list_of_dicts[i-1].append({"IP Adress(es)":list_of_dicts[i]["IP Adress(es)"]})
    return list_of_dicts

list_of_dicts=edit_dict(screen_output)


    
 

TypeError: list indices must be integers or slices, not dict

In [1]:
def fibonacci_sequence(limit):
    num1, num2 = 0, 1
    while num1 <= limit:
        yield num1
        num1, num2 = num2, num1 + num2

def sum_of_even_fibonacci(limit):
    even_sum = 0
    count = 0
    for num in fibonacci_sequence(limit):
        if num % 2 == 0:
            even_sum += num
            count += 1
        if count == 100:
            break
    return even_sum

print(sum_of_even_fibonacci(4000000))

4613732


In [3]:
#final
def fibonacci(num):
    n1,n2=0,1
    while n1<=num:
        yield n1
        n1,n2 = n2,n1+n2

def sum_of_even_fib(num):
    even_sum = 0 
    count = 0
    for number in fibonacci(num):
        if number % 2 ==0:
            even_sum += number
            count += 1
        if count == 100:
            break
    return even_sum

print(sum_of_even_fib(4000000))
#final

4613732


In [5]:
#final
def has_only_even_digits(num):
    for increment in str(num):
        if int(increment) % 2 != 0:
            return False
    return True

print(has_only_even_digits(2468)) # True
print(has_only_even_digits(1357)) # False
#final

True
False


In [6]:
#final
def calculate_iter_sum(val):
    total = 0
    for i in range(1, 5):
        total += int(str(val) * i)
    return total

print(calculate_iter_sum(9)) # 9 + 99 + 999 + 9999 = 11106
#final

11106


In [9]:
#final
def common_member(a, b):
    a_set = set(a)
    b_set = set(b)
 
    if (a_set & b_set):
        print(a_set & b_set)
        return list(a_set & b_set)
    else:
        print("No common elements")

a = [1, 2, 3, 4, 5]
b = [6, 1, 3, 5]
common_member(a, b)
#final

{1, 3, 5}


[1, 3, 5]

In [7]:
def num_gen(val,n):
    for i in range(1,val+1):
        if i%n == 0:
            yield i

sum(num_gen(102030,3))

1735071165

In [14]:
def f(n):
    result = []
    for i in range(n+1):
        result.append([j for j in range(1, i+1)])
    return result

def list_maker(n):
    for i in range(n+1):
        print(f(i))

list_maker(3)

[[]]
[[], [1]]
[[], [1], [1, 2]]
[[], [1], [1, 2], [1, 2, 3]]
